A 20-attempts-per-60s budget keyed by IP doesn't slow a distributed brute force that rotates source IPs. Add a parallel per-username bucket with the same threshold so a single account can't burn through more than 20 failed logins/min regardless of where they come from. Empty usernames aren't bucketed (would DoS the anonymous 401 path). Successful login clears both buckets.
79 lines
2.8 KiB
Python
79 lines
2.8 KiB
Python
from flask import Blueprint, Response, redirect, render_template, request
|
|
from sqlalchemy import select
|
|
|
|
from l4d2web.auth import hash_password, is_safe_next, login_user, logout_user, verify_password
|
|
from l4d2web.db import session_scope
|
|
from l4d2web.models import User
|
|
from l4d2web.services.rate_limit import check_rate_limit
|
|
|
|
|
|
bp = Blueprint("auth", __name__)
|
|
_TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__")
|
|
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60.0
|
|
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
|
|
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
|
|
LOGIN_ATTEMPTS_BY_USERNAME: dict[str, list[float]] = {}
|
|
|
|
|
|
def reset_login_rate_limits() -> None:
|
|
LOGIN_ATTEMPTS_BY_IP.clear()
|
|
LOGIN_ATTEMPTS_BY_USERNAME.clear()
|
|
|
|
|
|
def is_login_rate_limited(remote_addr: str) -> bool:
|
|
return check_rate_limit(
|
|
LOGIN_ATTEMPTS_BY_IP,
|
|
remote_addr,
|
|
window=LOGIN_RATE_LIMIT_WINDOW_SECONDS,
|
|
max_attempts=LOGIN_RATE_LIMIT_MAX_ATTEMPTS,
|
|
)
|
|
|
|
|
|
def is_username_rate_limited(username: str) -> bool:
|
|
# Empty usernames aren't bucketed — every anonymous probe would share
|
|
# one bucket and DoS the empty-username 401 path for everyone.
|
|
if not username:
|
|
return False
|
|
return check_rate_limit(
|
|
LOGIN_ATTEMPTS_BY_USERNAME,
|
|
username,
|
|
window=LOGIN_RATE_LIMIT_WINDOW_SECONDS,
|
|
max_attempts=LOGIN_RATE_LIMIT_MAX_ATTEMPTS,
|
|
)
|
|
|
|
|
|
@bp.get("/login")
|
|
def login_form() -> str:
|
|
next_target = request.args.get("next", "")
|
|
return render_template("login.html", next_target=next_target if is_safe_next(next_target) else "")
|
|
|
|
|
|
@bp.post("/login")
|
|
def login() -> Response:
|
|
remote_addr = request.remote_addr or "unknown"
|
|
if is_login_rate_limited(remote_addr):
|
|
return Response("too many login attempts", status=429)
|
|
|
|
username = request.form.get("username", "").strip()
|
|
password = request.form.get("password", "")
|
|
if is_username_rate_limited(username):
|
|
return Response("too many login attempts", status=429)
|
|
with session_scope() as db:
|
|
user = db.scalar(select(User).where(User.username == username))
|
|
digest = user.password_digest if user is not None else _TIMING_DUMMY_DIGEST
|
|
password_ok = verify_password(password, digest)
|
|
if user is None or not password_ok or not user.active:
|
|
# Same generic response for missing user, wrong password, or
|
|
# deactivated account — no timing oracle for deactivation status.
|
|
return Response("invalid credentials", status=401)
|
|
login_user(user.id, user.password_changed_at, user.admin)
|
|
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
|
|
LOGIN_ATTEMPTS_BY_USERNAME.pop(username, None)
|
|
next_target = request.form.get("next", "")
|
|
return redirect(next_target if is_safe_next(next_target) else "/dashboard")
|
|
|
|
|
|
@bp.post("/logout")
|
|
def logout() -> Response:
|
|
logout_user()
|
|
return redirect("/login")
|