diff --git a/l4d2web/routes/auth_routes.py b/l4d2web/routes/auth_routes.py index ce5d523..e2efc6f 100644 --- a/l4d2web/routes/auth_routes.py +++ b/l4d2web/routes/auth_routes.py @@ -12,10 +12,12 @@ _TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__") LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60.0 LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20 LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {} +LOGIN_ATTEMPTS_BY_USERNAME: dict[str, list[float]] = {} def reset_login_rate_limits() -> None: LOGIN_ATTEMPTS_BY_IP.clear() + LOGIN_ATTEMPTS_BY_USERNAME.clear() def is_login_rate_limited(remote_addr: str) -> bool: @@ -27,6 +29,19 @@ def is_login_rate_limited(remote_addr: str) -> bool: ) +def is_username_rate_limited(username: str) -> bool: + # Empty usernames aren't bucketed — every anonymous probe would share + # one bucket and DoS the empty-username 401 path for everyone. + if not username: + return False + return check_rate_limit( + LOGIN_ATTEMPTS_BY_USERNAME, + username, + window=LOGIN_RATE_LIMIT_WINDOW_SECONDS, + max_attempts=LOGIN_RATE_LIMIT_MAX_ATTEMPTS, + ) + + @bp.get("/login") def login_form() -> str: next_target = request.args.get("next", "") @@ -41,6 +56,8 @@ def login() -> Response: username = request.form.get("username", "").strip() password = request.form.get("password", "") + if is_username_rate_limited(username): + return Response("too many login attempts", status=429) with session_scope() as db: user = db.scalar(select(User).where(User.username == username)) digest = user.password_digest if user is not None else _TIMING_DUMMY_DIGEST @@ -51,6 +68,7 @@ def login() -> Response: return Response("invalid credentials", status=401) login_user(user.id, user.password_changed_at, user.admin) LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None) + LOGIN_ATTEMPTS_BY_USERNAME.pop(username, None) next_target = request.form.get("next", "") return redirect(next_target if is_safe_next(next_target) else "/dashboard") diff --git a/l4d2web/tests/test_security.py b/l4d2web/tests/test_security.py index 31d2323..7e1cdc4 100644 --- a/l4d2web/tests/test_security.py +++ b/l4d2web/tests/test_security.py @@ -30,6 +30,45 @@ def test_login_rate_limit(client) -> None: assert response.status_code == 429 +def test_login_rate_limit_per_username_across_ips(client) -> None: + """A distributed brute-force that rotates the source IP must still be + slowed once the per-username budget for a single account is exhausted.""" + for i in range(20): + client.post( + "/login", + data={"username": "alice", "password": "wrong"}, + environ_overrides={"REMOTE_ADDR": f"10.0.0.{i}"}, + ) + + response = client.post( + "/login", + data={"username": "alice", "password": "wrong"}, + environ_overrides={"REMOTE_ADDR": "10.99.99.99"}, + ) + assert response.status_code == 429 + + +def test_login_rate_limit_empty_username_does_not_bucket(client) -> None: + """Anonymous probes with no username must not all share one bucket — that + would let an attacker DoS legitimate empty-username 401s. We just don't + track them per-username.""" + for i in range(25): + client.post( + "/login", + data={"username": "", "password": "wrong"}, + environ_overrides={"REMOTE_ADDR": f"10.1.0.{i}"}, + ) + + # A real username from a fresh IP must still get through to the + # 401 path (i.e. not be 429ed by empty-username spillover). + response = client.post( + "/login", + data={"username": "alice", "password": "wrong"}, + environ_overrides={"REMOTE_ADDR": "10.2.0.1"}, + ) + assert response.status_code == 401 + + def test_security_headers_present(client) -> None: response = client.get("/login") assert response.headers.get("X-Content-Type-Options") == "nosniff"