login_user now records the user's current password_changed_at on the session. The next commit will use this marker to invalidate sessions whose password has been rotated under them. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
61 lines
2.2 KiB
Python
61 lines
2.2 KiB
Python
from flask import Blueprint, Response, redirect, render_template, request
|
|
from sqlalchemy import select
|
|
|
|
from l4d2web.auth import hash_password, is_safe_next, login_user, logout_user, verify_password
|
|
from l4d2web.db import session_scope
|
|
from l4d2web.models import User
|
|
from l4d2web.services.rate_limit import check_rate_limit
|
|
|
|
|
|
bp = Blueprint("auth", __name__)
|
|
_TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__")
|
|
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60.0
|
|
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
|
|
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
|
|
|
|
|
|
def reset_login_rate_limits() -> None:
|
|
LOGIN_ATTEMPTS_BY_IP.clear()
|
|
|
|
|
|
def is_login_rate_limited(remote_addr: str) -> bool:
|
|
return check_rate_limit(
|
|
LOGIN_ATTEMPTS_BY_IP,
|
|
remote_addr,
|
|
window=LOGIN_RATE_LIMIT_WINDOW_SECONDS,
|
|
max_attempts=LOGIN_RATE_LIMIT_MAX_ATTEMPTS,
|
|
)
|
|
|
|
|
|
@bp.get("/login")
|
|
def login_form() -> str:
|
|
next_target = request.args.get("next", "")
|
|
return render_template("login.html", next_target=next_target if is_safe_next(next_target) else "")
|
|
|
|
|
|
@bp.post("/login")
|
|
def login() -> Response:
|
|
remote_addr = request.remote_addr or "unknown"
|
|
if is_login_rate_limited(remote_addr):
|
|
return Response("too many login attempts", status=429)
|
|
|
|
username = request.form.get("username", "").strip()
|
|
password = request.form.get("password", "")
|
|
with session_scope() as db:
|
|
user = db.scalar(select(User).where(User.username == username))
|
|
digest = user.password_digest if user is not None else _TIMING_DUMMY_DIGEST
|
|
password_ok = verify_password(password, digest)
|
|
if user is None or not password_ok or not user.active:
|
|
# Same generic response for missing user, wrong password, or
|
|
# deactivated account — no timing oracle for deactivation status.
|
|
return Response("invalid credentials", status=401)
|
|
login_user(user.id, user.password_changed_at)
|
|
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
|
|
next_target = request.form.get("next", "")
|
|
return redirect(next_target if is_safe_next(next_target) else "/dashboard")
|
|
|
|
|
|
@bp.post("/logout")
|
|
def logout() -> Response:
|
|
logout_user()
|
|
return redirect("/login")
|