harden(l4d2web): auth/session — clear on login+logout, constant-time CSRF, role-change invalidation

- login_user clears any pre-login session state before stamping user_id/pw_changed_at/admin so a fixated cookie value cannot smuggle data past the login boundary
- logout_user now session.clear()s instead of only popping user_id, removing leftover pw_changed_at/admin markers
- CSRF token comparison uses hmac.compare_digest
- load_current_user rejects sessions where the stamped admin flag no longer matches the user row, preventing a demoted admin from retaining elevated access until next password change (backward-compatible: sessions issued pre-upgrade lack the marker and pass through until next login)
This commit is contained in:
mwiegand 2026-05-14 22:18:46 +02:00
parent 66d14feca5
commit 2902c9cc82
No known key found for this signature in database
4 changed files with 94 additions and 4 deletions

View file

@ -1,3 +1,4 @@
import hmac
import os
import secrets
@ -72,7 +73,8 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
return None
token = request.headers.get("X-CSRF-Token") or request.form.get("csrf_token")
if token != session.get("csrf_token"):
expected = session.get("csrf_token")
if not token or not expected or not hmac.compare_digest(token, expected):
return Response("csrf token missing or invalid", status=400)
return None

View file

@ -62,6 +62,15 @@ def load_current_user() -> None:
g.user = None
return
# Role change since login → reject. Prevents a demoted admin from
# keeping admin until their next password change. Absence of the marker
# means a pre-upgrade session; let it through and the next login will
# stamp it.
session_admin = session.get("admin")
if session_admin is not None and bool(session_admin) != bool(user.admin):
g.user = None
return
g.user = user
@ -69,13 +78,18 @@ def current_user() -> User | None:
return getattr(g, "user", None)
def login_user(user_id: int, password_changed_at) -> None:
def login_user(user_id: int, password_changed_at, admin: bool) -> None:
# Drop any pre-login session state so a fixated cookie value can't smuggle
# data past the login boundary. The next request's csrf_protect hook will
# mint a fresh csrf_token.
session.clear()
session["user_id"] = user_id
session["pw_changed_at"] = password_changed_at.isoformat()
session["admin"] = bool(admin)
def logout_user() -> None:
session.pop("user_id", None)
session.clear()
def is_safe_next(target: str | None) -> bool:

View file

@ -49,7 +49,7 @@ def login() -> Response:
# Same generic response for missing user, wrong password, or
# deactivated account — no timing oracle for deactivation status.
return Response("invalid credentials", status=401)
login_user(user.id, user.password_changed_at)
login_user(user.id, user.password_changed_at, user.admin)
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
next_target = request.form.get("next", "")
return redirect(next_target if is_safe_next(next_target) else "/dashboard")

View file

@ -1,4 +1,5 @@
import pytest
from sqlalchemy import select
from l4d2web.app import create_app
from l4d2web.auth import hash_password
@ -199,6 +200,79 @@ def test_login_stamps_password_changed_at_in_session(client) -> None:
assert marker == user.password_changed_at.isoformat()
def test_login_clears_pre_login_session_state(client) -> None:
"""Pre-login session keys must not survive into the authenticated session,
so a fixated cookie value cannot smuggle state past the login boundary."""
with session_scope() as session:
session.add(User(username="alice", password_digest=hash_password("secret")))
with client.session_transaction() as sess:
sess["pre_login_marker"] = "smuggled"
response = client.post("/login", data={"username": "alice", "password": "secret"})
assert response.status_code == 302
with client.session_transaction() as sess:
assert sess.get("user_id") is not None
assert "pre_login_marker" not in sess
def test_logout_clears_session_completely(client) -> None:
"""logout_user must drop pw_changed_at alongside user_id so no auth
markers linger after sign-out."""
with session_scope() as session:
session.add(User(username="alice", password_digest=hash_password("secret")))
client.post("/login", data={"username": "alice", "password": "secret"})
with client.session_transaction() as sess:
assert sess.get("user_id") is not None
assert sess.get("pw_changed_at") is not None
sess["csrf_token"] = "test-token"
client.post("/logout", data={"csrf_token": "test-token"})
with client.session_transaction() as sess:
assert "user_id" not in sess
assert "pw_changed_at" not in sess
def test_load_current_user_rejects_role_change(client) -> None:
"""If a user's admin flag changes after login, the existing session must
be rejected on the next request preventing demoted admins from
retaining elevated access until their next password change."""
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("secret"), admin=True)
db.add(u)
db.flush()
uid = u.id
marker = u.password_changed_at.isoformat()
with client.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = marker
sess["admin"] = True
# Demote outside the session.
with session_scope() as db:
target = db.scalar(select(User).where(User.id == uid))
target.admin = False
response = client.get("/dashboard")
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_login_stamps_admin_flag_in_session(client) -> None:
with session_scope() as session:
session.add(User(username="alice", password_digest=hash_password("secret"), admin=True))
response = client.post("/login", data={"username": "alice", "password": "secret"})
assert response.status_code == 302
with client.session_transaction() as sess:
assert sess.get("admin") is True
def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'create_user.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)