harden(l4d2web): per-username login rate limit alongside per-IP
A 20-attempts-per-60s budget keyed by IP doesn't slow a distributed brute force that rotates source IPs. Add a parallel per-username bucket with the same threshold so a single account can't burn through more than 20 failed logins/min regardless of where they come from. Empty usernames aren't bucketed (would DoS the anonymous 401 path). Successful login clears both buckets.
This commit is contained in:
parent
0e2a78e065
commit
bbb2b983bc
2 changed files with 57 additions and 0 deletions
|
|
@ -12,10 +12,12 @@ _TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__")
|
|||
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60.0
|
||||
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
|
||||
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
|
||||
LOGIN_ATTEMPTS_BY_USERNAME: dict[str, list[float]] = {}
|
||||
|
||||
|
||||
def reset_login_rate_limits() -> None:
|
||||
LOGIN_ATTEMPTS_BY_IP.clear()
|
||||
LOGIN_ATTEMPTS_BY_USERNAME.clear()
|
||||
|
||||
|
||||
def is_login_rate_limited(remote_addr: str) -> bool:
|
||||
|
|
@ -27,6 +29,19 @@ def is_login_rate_limited(remote_addr: str) -> bool:
|
|||
)
|
||||
|
||||
|
||||
def is_username_rate_limited(username: str) -> bool:
|
||||
# Empty usernames aren't bucketed — every anonymous probe would share
|
||||
# one bucket and DoS the empty-username 401 path for everyone.
|
||||
if not username:
|
||||
return False
|
||||
return check_rate_limit(
|
||||
LOGIN_ATTEMPTS_BY_USERNAME,
|
||||
username,
|
||||
window=LOGIN_RATE_LIMIT_WINDOW_SECONDS,
|
||||
max_attempts=LOGIN_RATE_LIMIT_MAX_ATTEMPTS,
|
||||
)
|
||||
|
||||
|
||||
@bp.get("/login")
|
||||
def login_form() -> str:
|
||||
next_target = request.args.get("next", "")
|
||||
|
|
@ -41,6 +56,8 @@ def login() -> Response:
|
|||
|
||||
username = request.form.get("username", "").strip()
|
||||
password = request.form.get("password", "")
|
||||
if is_username_rate_limited(username):
|
||||
return Response("too many login attempts", status=429)
|
||||
with session_scope() as db:
|
||||
user = db.scalar(select(User).where(User.username == username))
|
||||
digest = user.password_digest if user is not None else _TIMING_DUMMY_DIGEST
|
||||
|
|
@ -51,6 +68,7 @@ def login() -> Response:
|
|||
return Response("invalid credentials", status=401)
|
||||
login_user(user.id, user.password_changed_at, user.admin)
|
||||
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
|
||||
LOGIN_ATTEMPTS_BY_USERNAME.pop(username, None)
|
||||
next_target = request.form.get("next", "")
|
||||
return redirect(next_target if is_safe_next(next_target) else "/dashboard")
|
||||
|
||||
|
|
|
|||
|
|
@ -30,6 +30,45 @@ def test_login_rate_limit(client) -> None:
|
|||
assert response.status_code == 429
|
||||
|
||||
|
||||
def test_login_rate_limit_per_username_across_ips(client) -> None:
|
||||
"""A distributed brute-force that rotates the source IP must still be
|
||||
slowed once the per-username budget for a single account is exhausted."""
|
||||
for i in range(20):
|
||||
client.post(
|
||||
"/login",
|
||||
data={"username": "alice", "password": "wrong"},
|
||||
environ_overrides={"REMOTE_ADDR": f"10.0.0.{i}"},
|
||||
)
|
||||
|
||||
response = client.post(
|
||||
"/login",
|
||||
data={"username": "alice", "password": "wrong"},
|
||||
environ_overrides={"REMOTE_ADDR": "10.99.99.99"},
|
||||
)
|
||||
assert response.status_code == 429
|
||||
|
||||
|
||||
def test_login_rate_limit_empty_username_does_not_bucket(client) -> None:
|
||||
"""Anonymous probes with no username must not all share one bucket — that
|
||||
would let an attacker DoS legitimate empty-username 401s. We just don't
|
||||
track them per-username."""
|
||||
for i in range(25):
|
||||
client.post(
|
||||
"/login",
|
||||
data={"username": "", "password": "wrong"},
|
||||
environ_overrides={"REMOTE_ADDR": f"10.1.0.{i}"},
|
||||
)
|
||||
|
||||
# A real username from a fresh IP must still get through to the
|
||||
# 401 path (i.e. not be 429ed by empty-username spillover).
|
||||
response = client.post(
|
||||
"/login",
|
||||
data={"username": "alice", "password": "wrong"},
|
||||
environ_overrides={"REMOTE_ADDR": "10.2.0.1"},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
def test_security_headers_present(client) -> None:
|
||||
response = client.get("/login")
|
||||
assert response.headers.get("X-Content-Type-Options") == "nosniff"
|
||||
|
|
|
|||
Loading…
Reference in a new issue