From 26a6a9d7b082a756201672758c79578b06cfa99e Mon Sep 17 00:00:00 2001 From: mwiegand Date: Mon, 11 May 2026 21:45:51 +0200 Subject: [PATCH] rate-limit: extract generic helper, reuse from login Pulled the per-IP sliding-window check out of auth_routes so the upcoming /profile/password endpoint can share it. Co-Authored-By: Claude Opus 4.7 (1M context) --- l4d2web/routes/auth_routes.py | 19 ++++++++----------- l4d2web/services/rate_limit.py | 23 +++++++++++++++++++++++ l4d2web/tests/test_rate_limit.py | 31 +++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 11 deletions(-) create mode 100644 l4d2web/services/rate_limit.py create mode 100644 l4d2web/tests/test_rate_limit.py diff --git a/l4d2web/routes/auth_routes.py b/l4d2web/routes/auth_routes.py index 6a8559a..cf7ef4a 100644 --- a/l4d2web/routes/auth_routes.py +++ b/l4d2web/routes/auth_routes.py @@ -1,16 +1,15 @@ -import time - from flask import Blueprint, Response, redirect, render_template, request from sqlalchemy import select from l4d2web.auth import hash_password, is_safe_next, login_user, logout_user, verify_password from l4d2web.db import session_scope from l4d2web.models import User +from l4d2web.services.rate_limit import check_rate_limit bp = Blueprint("auth", __name__) _TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__") -LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60 +LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60.0 LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20 LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {} @@ -20,14 +19,12 @@ def reset_login_rate_limits() -> None: def is_login_rate_limited(remote_addr: str) -> bool: - now = time.time() - attempts = LOGIN_ATTEMPTS_BY_IP.setdefault(remote_addr, []) - cutoff = now - LOGIN_RATE_LIMIT_WINDOW_SECONDS - attempts[:] = [ts for ts in attempts if ts >= cutoff] - if len(attempts) >= LOGIN_RATE_LIMIT_MAX_ATTEMPTS: - return True - attempts.append(now) - return False + return check_rate_limit( + LOGIN_ATTEMPTS_BY_IP, + remote_addr, + window=LOGIN_RATE_LIMIT_WINDOW_SECONDS, + max_attempts=LOGIN_RATE_LIMIT_MAX_ATTEMPTS, + ) @bp.get("/login") diff --git a/l4d2web/services/rate_limit.py b/l4d2web/services/rate_limit.py new file mode 100644 index 0000000..bfa6715 --- /dev/null +++ b/l4d2web/services/rate_limit.py @@ -0,0 +1,23 @@ +import time + + +def check_rate_limit( + bucket: dict[str, list[float]], + remote_addr: str, + *, + window: float, + max_attempts: int, +) -> bool: + """Return True if this request should be rate-limited (rejected). + + The bucket is owned by the caller so each endpoint can keep an + independent count and tests can reset state cleanly. + """ + now = time.time() + attempts = bucket.setdefault(remote_addr, []) + cutoff = now - window + attempts[:] = [ts for ts in attempts if ts >= cutoff] + if len(attempts) >= max_attempts: + return True + attempts.append(now) + return False diff --git a/l4d2web/tests/test_rate_limit.py b/l4d2web/tests/test_rate_limit.py new file mode 100644 index 0000000..a89f320 --- /dev/null +++ b/l4d2web/tests/test_rate_limit.py @@ -0,0 +1,31 @@ +import time + +from l4d2web.services.rate_limit import check_rate_limit + + +def test_under_threshold_allows(): + bucket: dict[str, list[float]] = {} + for _ in range(3): + assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is False + + +def test_at_threshold_blocks(): + bucket: dict[str, list[float]] = {} + for _ in range(5): + assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is False + assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is True + + +def test_other_ips_independent(): + bucket: dict[str, list[float]] = {} + for _ in range(5): + check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) + assert check_rate_limit(bucket, "5.6.7.8", window=60.0, max_attempts=5) is False + + +def test_old_attempts_expire(): + bucket: dict[str, list[float]] = {} + for _ in range(5): + check_rate_limit(bucket, "1.2.3.4", window=0.05, max_attempts=5) + time.sleep(0.1) + assert check_rate_limit(bucket, "1.2.3.4", window=0.05, max_attempts=5) is False