diff --git a/l4d2web/app.py b/l4d2web/app.py index 5aee550..4c6aad3 100644 --- a/l4d2web/app.py +++ b/l4d2web/app.py @@ -1,3 +1,4 @@ +import hmac import os import secrets @@ -72,7 +73,8 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask: return None token = request.headers.get("X-CSRF-Token") or request.form.get("csrf_token") - if token != session.get("csrf_token"): + expected = session.get("csrf_token") + if not token or not expected or not hmac.compare_digest(token, expected): return Response("csrf token missing or invalid", status=400) return None diff --git a/l4d2web/auth.py b/l4d2web/auth.py index 1176e13..3592770 100644 --- a/l4d2web/auth.py +++ b/l4d2web/auth.py @@ -62,6 +62,15 @@ def load_current_user() -> None: g.user = None return + # Role change since login → reject. Prevents a demoted admin from + # keeping admin until their next password change. Absence of the marker + # means a pre-upgrade session; let it through and the next login will + # stamp it. + session_admin = session.get("admin") + if session_admin is not None and bool(session_admin) != bool(user.admin): + g.user = None + return + g.user = user @@ -69,13 +78,18 @@ def current_user() -> User | None: return getattr(g, "user", None) -def login_user(user_id: int, password_changed_at) -> None: +def login_user(user_id: int, password_changed_at, admin: bool) -> None: + # Drop any pre-login session state so a fixated cookie value can't smuggle + # data past the login boundary. The next request's csrf_protect hook will + # mint a fresh csrf_token. + session.clear() session["user_id"] = user_id session["pw_changed_at"] = password_changed_at.isoformat() + session["admin"] = bool(admin) def logout_user() -> None: - session.pop("user_id", None) + session.clear() def is_safe_next(target: str | None) -> bool: diff --git a/l4d2web/routes/auth_routes.py b/l4d2web/routes/auth_routes.py index afb7c63..ce5d523 100644 --- a/l4d2web/routes/auth_routes.py +++ b/l4d2web/routes/auth_routes.py @@ -49,7 +49,7 @@ def login() -> Response: # Same generic response for missing user, wrong password, or # deactivated account — no timing oracle for deactivation status. return Response("invalid credentials", status=401) - login_user(user.id, user.password_changed_at) + login_user(user.id, user.password_changed_at, user.admin) LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None) next_target = request.form.get("next", "") return redirect(next_target if is_safe_next(next_target) else "/dashboard") diff --git a/l4d2web/tests/test_auth.py b/l4d2web/tests/test_auth.py index 38f2f07..1672a8f 100644 --- a/l4d2web/tests/test_auth.py +++ b/l4d2web/tests/test_auth.py @@ -1,4 +1,5 @@ import pytest +from sqlalchemy import select from l4d2web.app import create_app from l4d2web.auth import hash_password @@ -199,6 +200,79 @@ def test_login_stamps_password_changed_at_in_session(client) -> None: assert marker == user.password_changed_at.isoformat() +def test_login_clears_pre_login_session_state(client) -> None: + """Pre-login session keys must not survive into the authenticated session, + so a fixated cookie value cannot smuggle state past the login boundary.""" + with session_scope() as session: + session.add(User(username="alice", password_digest=hash_password("secret"))) + + with client.session_transaction() as sess: + sess["pre_login_marker"] = "smuggled" + + response = client.post("/login", data={"username": "alice", "password": "secret"}) + assert response.status_code == 302 + + with client.session_transaction() as sess: + assert sess.get("user_id") is not None + assert "pre_login_marker" not in sess + + +def test_logout_clears_session_completely(client) -> None: + """logout_user must drop pw_changed_at alongside user_id so no auth + markers linger after sign-out.""" + with session_scope() as session: + session.add(User(username="alice", password_digest=hash_password("secret"))) + + client.post("/login", data={"username": "alice", "password": "secret"}) + with client.session_transaction() as sess: + assert sess.get("user_id") is not None + assert sess.get("pw_changed_at") is not None + sess["csrf_token"] = "test-token" + + client.post("/logout", data={"csrf_token": "test-token"}) + + with client.session_transaction() as sess: + assert "user_id" not in sess + assert "pw_changed_at" not in sess + + +def test_load_current_user_rejects_role_change(client) -> None: + """If a user's admin flag changes after login, the existing session must + be rejected on the next request — preventing demoted admins from + retaining elevated access until their next password change.""" + with session_scope() as db: + u = User(username="alice", password_digest=hash_password("secret"), admin=True) + db.add(u) + db.flush() + uid = u.id + marker = u.password_changed_at.isoformat() + + with client.session_transaction() as sess: + sess["user_id"] = uid + sess["pw_changed_at"] = marker + sess["admin"] = True + + # Demote outside the session. + with session_scope() as db: + target = db.scalar(select(User).where(User.id == uid)) + target.admin = False + + response = client.get("/dashboard") + assert response.status_code == 302 + assert "/login" in response.headers["Location"] + + +def test_login_stamps_admin_flag_in_session(client) -> None: + with session_scope() as session: + session.add(User(username="alice", password_digest=hash_password("secret"), admin=True)) + + response = client.post("/login", data={"username": "alice", "password": "secret"}) + assert response.status_code == 302 + + with client.session_transaction() as sess: + assert sess.get("admin") is True + + def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> None: db_url = f"sqlite:///{tmp_path/'create_user.db'}" monkeypatch.setenv("DATABASE_URL", db_url)