From e75f379dcb0955725505df1b2c9bf39927215fc1 Mon Sep 17 00:00:00 2001 From: mwiegand Date: Mon, 11 May 2026 21:54:13 +0200 Subject: [PATCH] auth: reject sessions older than user.password_changed_at MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit load_current_user now treats a session whose pw_changed_at marker is missing, malformed, or older than the user's current password_changed_at as logged-out. Same shape as the existing user.active check. Forced fan-out updates to every test fixture that forges a session via session_transaction(): each now stamps a current pw_changed_at marker. test_deactivated_user_existing_session_invalidated keeps its meaning — the deactivation still flips the user to inactive, and load_current_user rejects the session via the user.active branch before reaching the freshness branch. Co-Authored-By: Claude Opus 4.7 (1M context) --- l4d2web/auth.py | 27 +++++++++-- l4d2web/tests/test_admin_users.py | 6 +++ l4d2web/tests/test_auth.py | 50 ++++++++++++++++++++ l4d2web/tests/test_blueprints.py | 3 ++ l4d2web/tests/test_job_logs.py | 4 ++ l4d2web/tests/test_overlay_files_routes.py | 2 + l4d2web/tests/test_overlays.py | 4 ++ l4d2web/tests/test_pages.py | 10 ++++ l4d2web/tests/test_script_overlay_routes.py | 2 + l4d2web/tests/test_servers.py | 6 +++ l4d2web/tests/test_status_and_server_logs.py | 2 + l4d2web/tests/test_workshop_routes.py | 2 + 12 files changed, 115 insertions(+), 3 deletions(-) diff --git a/l4d2web/auth.py b/l4d2web/auth.py index b3250e9..1176e13 100644 --- a/l4d2web/auth.py +++ b/l4d2web/auth.py @@ -1,3 +1,4 @@ +from datetime import datetime from functools import wraps from typing import Callable, TypeVar from urllib.parse import quote, unquote @@ -39,9 +40,29 @@ def load_current_user() -> None: return with session_scope() as db: user = db.scalar(select(User).where(User.id == int(user_id))) - # Treat deactivated users as logged-out so existing sessions stop - # working as soon as an admin flips active=False. - g.user = user if user is not None and user.active else None + if user is None or not user.active: + g.user = None + return + + marker = session.get("pw_changed_at") + if marker is None: + g.user = None + return + try: + marker_dt = datetime.fromisoformat(marker) + except ValueError: + g.user = None + return + # user.password_changed_at comes back naive from SQLite; strip tz from the + # marker so an aware-marker session (just stamped from an in-memory user) + # compares cleanly with a freshly-loaded user row. + if marker_dt.tzinfo is not None: + marker_dt = marker_dt.replace(tzinfo=None) + if marker_dt < user.password_changed_at: + g.user = None + return + + g.user = user def current_user() -> User | None: diff --git a/l4d2web/tests/test_admin_users.py b/l4d2web/tests/test_admin_users.py index 5294e9e..823c64d 100644 --- a/l4d2web/tests/test_admin_users.py +++ b/l4d2web/tests/test_admin_users.py @@ -25,10 +25,12 @@ def admin_client(tmp_path, monkeypatch): db.add_all([admin, second_admin]) db.flush() admin_id = admin.id + admin_marker = admin.password_changed_at.isoformat() client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = admin_id + sess["pw_changed_at"] = admin_marker sess["csrf_token"] = "test-token" return client, admin_id @@ -124,11 +126,15 @@ def test_deactivated_user_existing_session_invalidated(admin_client): """An active session at the moment of deactivation stops working.""" client, _ = admin_client target = _add_user("bob") + with session_scope() as db: + bob = db.scalar(select(User).where(User.id == target)) + bob_marker = bob.password_changed_at.isoformat() # Forge a session for bob. bob_client = client.application.test_client() with bob_client.session_transaction() as sess: sess["user_id"] = target + sess["pw_changed_at"] = bob_marker sess["csrf_token"] = "test-token" # Sanity: bob can hit a logged-in route. diff --git a/l4d2web/tests/test_auth.py b/l4d2web/tests/test_auth.py index b7a89bc..18d83ef 100644 --- a/l4d2web/tests/test_auth.py +++ b/l4d2web/tests/test_auth.py @@ -134,6 +134,56 @@ def test_login_sets_session(client) -> None: assert sess.get("user_id") is not None +def test_load_current_user_rejects_missing_marker(client) -> None: + with session_scope() as db: + u = User(username="alice", password_digest=hash_password("secret")) + db.add(u) + db.flush() + uid = u.id + + with client.session_transaction() as sess: + sess["user_id"] = uid + + response = client.get("/dashboard") + assert response.status_code == 302 + assert "/login" in response.headers["Location"] + + +def test_load_current_user_rejects_stale_marker(client) -> None: + from datetime import UTC, datetime, timedelta + + with session_scope() as db: + u = User(username="alice", password_digest=hash_password("secret")) + db.add(u) + db.flush() + uid = u.id + + stale = datetime.now(UTC).replace(tzinfo=None) - timedelta(minutes=5) + with client.session_transaction() as sess: + sess["user_id"] = uid + sess["pw_changed_at"] = stale.isoformat() + + response = client.get("/dashboard") + assert response.status_code == 302 + assert "/login" in response.headers["Location"] + + +def test_load_current_user_accepts_current_marker(client) -> None: + with session_scope() as db: + u = User(username="alice", password_digest=hash_password("secret")) + db.add(u) + db.flush() + uid = u.id + marker = u.password_changed_at.isoformat() + + with client.session_transaction() as sess: + sess["user_id"] = uid + sess["pw_changed_at"] = marker + + response = client.get("/dashboard") + assert response.status_code == 200 + + def test_login_stamps_password_changed_at_in_session(client) -> None: with session_scope() as session: session.add(User(username="alice", password_digest=hash_password("secret"))) diff --git a/l4d2web/tests/test_blueprints.py b/l4d2web/tests/test_blueprints.py index 271eb94..20982a3 100644 --- a/l4d2web/tests/test_blueprints.py +++ b/l4d2web/tests/test_blueprints.py @@ -1,4 +1,5 @@ import json +from datetime import UTC, datetime import pytest from sqlalchemy import select @@ -31,6 +32,7 @@ def user_client(tmp_path, monkeypatch): client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() sess["csrf_token"] = "test-token" return client @@ -58,6 +60,7 @@ def linked_blueprint(tmp_path, monkeypatch): client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() sess["csrf_token"] = "test-token" return client, blueprint_id diff --git a/l4d2web/tests/test_job_logs.py b/l4d2web/tests/test_job_logs.py index 6d2842c..a28fb2c 100644 --- a/l4d2web/tests/test_job_logs.py +++ b/l4d2web/tests/test_job_logs.py @@ -1,4 +1,5 @@ from pathlib import Path +from datetime import UTC, datetime from sqlalchemy import text @@ -66,6 +67,7 @@ def test_sse_resume_from_last_seq(seeded_job_logs) -> None: client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() response = client.get(f"/jobs/{job_id}/stream?last_seq=5") assert response.status_code == 200 @@ -76,6 +78,7 @@ def test_sse_replays_custom_job_log_events(seeded_job_logs) -> None: client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() response = client.get(f"/jobs/{job_id}/stream?last_seq=5") text = response.get_data(as_text=True) @@ -91,6 +94,7 @@ def test_sse_resumes_from_last_event_id_header(seeded_job_logs) -> None: client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() response = client.get(f"/jobs/{job_id}/stream", headers={"Last-Event-ID": "6"}) text = response.get_data(as_text=True) diff --git a/l4d2web/tests/test_overlay_files_routes.py b/l4d2web/tests/test_overlay_files_routes.py index 59984cb..b7d50be 100644 --- a/l4d2web/tests/test_overlay_files_routes.py +++ b/l4d2web/tests/test_overlay_files_routes.py @@ -1,5 +1,6 @@ """HTTP-level tests for the overlay 'Files' tree-fragment + download routes.""" from __future__ import annotations +from datetime import UTC, datetime import io import os @@ -34,6 +35,7 @@ def _client_for(app, user_id: int): client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() sess["csrf_token"] = "test-token" return client diff --git a/l4d2web/tests/test_overlays.py b/l4d2web/tests/test_overlays.py index e71d589..76ddc9a 100644 --- a/l4d2web/tests/test_overlays.py +++ b/l4d2web/tests/test_overlays.py @@ -4,6 +4,7 @@ from l4d2web.auth import hash_password from l4d2web.db import init_db, session_scope from l4d2web.models import Blueprint, BlueprintOverlay, Overlay, User from l4d2web.services.security import validate_overlay_ref +from datetime import UTC, datetime @pytest.fixture @@ -23,6 +24,7 @@ def admin_client(tmp_path, monkeypatch): client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = admin_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() sess["csrf_token"] = "test-token" return client @@ -48,6 +50,7 @@ def user_client_with_overlay(tmp_path, monkeypatch): client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() sess["csrf_token"] = "test-token" return client @@ -146,6 +149,7 @@ def test_two_users_can_have_workshop_overlay_with_same_name(tmp_path, monkeypatc c = app.test_client() with c.session_transaction() as sess: sess["user_id"] = uid + sess["pw_changed_at"] = datetime.now(UTC).isoformat() sess["csrf_token"] = "test-token" return c diff --git a/l4d2web/tests/test_pages.py b/l4d2web/tests/test_pages.py index 9c6ff17..15a6803 100644 --- a/l4d2web/tests/test_pages.py +++ b/l4d2web/tests/test_pages.py @@ -1,5 +1,6 @@ import pytest from pathlib import Path +from datetime import UTC, datetime from l4d2web.app import create_app from l4d2web.auth import hash_password @@ -35,6 +36,7 @@ def auth_client_with_server(tmp_path, monkeypatch): client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() return client @@ -61,6 +63,7 @@ def user_client_and_other_blueprint(tmp_path, monkeypatch): client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = owner_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() return client, blueprint_id @@ -345,6 +348,7 @@ def test_admin_can_use_admin_pages(tmp_path, monkeypatch) -> None: client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = admin_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() admin_page = client.get("/admin") assert admin_page.status_code == 200 @@ -380,6 +384,7 @@ def test_admin_can_view_other_users_job(tmp_path, monkeypatch) -> None: client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = admin_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() response = client.get(f"/jobs/{job_id}") @@ -402,6 +407,7 @@ def test_admin_can_enqueue_runtime_install_job(tmp_path, monkeypatch) -> None: client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = admin_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() sess["csrf_token"] = "test-token" response = client.post("/admin/install", headers={"X-CSRF-Token": "test-token"}) @@ -457,6 +463,7 @@ def test_root_redirects_by_auth_state(tmp_path, monkeypatch) -> None: with client.session_transaction() as sess: sess["user_id"] = user_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() logged_in_response = client.get("/") assert logged_in_response.status_code == 302 @@ -514,6 +521,7 @@ def test_admin_jobs_page_renders_system_job(tmp_path, monkeypatch) -> None: admin_client = app.test_client() with admin_client.session_transaction() as sess: sess["user_id"] = admin_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() with session_scope() as db: db.add(Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued")) @@ -541,6 +549,7 @@ def test_non_admin_cannot_view_system_job(tmp_path, monkeypatch) -> None: user_client = app.test_client() with user_client.session_transaction() as sess: sess["user_id"] = user_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() with session_scope() as db: job = Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued") @@ -717,6 +726,7 @@ def test_overlay_jobs_page_403_for_other_users_private_overlay(tmp_path, monkeyp client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = other_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() response = client.get(f"/overlays/{overlay_id}/jobs") assert response.status_code == 403 diff --git a/l4d2web/tests/test_script_overlay_routes.py b/l4d2web/tests/test_script_overlay_routes.py index feb1994..91897e7 100644 --- a/l4d2web/tests/test_script_overlay_routes.py +++ b/l4d2web/tests/test_script_overlay_routes.py @@ -1,6 +1,7 @@ """Routes for type='script' overlays: create, /script (update body), /wipe, /build. Permissions mirror workshop overlays (owner or admin).""" from __future__ import annotations +from datetime import UTC, datetime import pytest from sqlalchemy import select @@ -52,6 +53,7 @@ def _client_for(app, user_id: int): client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() sess["csrf_token"] = "test-token" return client diff --git a/l4d2web/tests/test_servers.py b/l4d2web/tests/test_servers.py index 4321686..b08286a 100644 --- a/l4d2web/tests/test_servers.py +++ b/l4d2web/tests/test_servers.py @@ -1,4 +1,5 @@ import json +from datetime import UTC, datetime import pytest @@ -32,6 +33,7 @@ def user_client_with_blueprints(tmp_path, monkeypatch): client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = payload["user_id"] + sess["pw_changed_at"] = datetime.now(UTC).isoformat() sess["csrf_token"] = "test-token" return client, payload @@ -52,6 +54,7 @@ def test_servers_page_without_blueprints_shows_create_blueprint_cta(tmp_path, mo client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() sess["csrf_token"] = "test-token" response = client.get("/servers") @@ -235,6 +238,7 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc with client.session_transaction() as sess: sess["user_id"] = alice_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() sess["csrf_token"] = "test-token" alice_resp = client.post( "/servers", @@ -245,6 +249,7 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc with client.session_transaction() as sess: sess["user_id"] = bob_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() sess["csrf_token"] = "test-token" bob_resp = client.post( "/servers", @@ -318,6 +323,7 @@ def test_create_server_returns_409_when_port_range_exhausted(tmp_path, monkeypat client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() sess["csrf_token"] = "test-token" first = client.post( diff --git a/l4d2web/tests/test_status_and_server_logs.py b/l4d2web/tests/test_status_and_server_logs.py index 978211e..177dbca 100644 --- a/l4d2web/tests/test_status_and_server_logs.py +++ b/l4d2web/tests/test_status_and_server_logs.py @@ -1,4 +1,5 @@ import pytest +from datetime import UTC, datetime from l4d2web.app import create_app from l4d2web.auth import hash_password @@ -32,6 +33,7 @@ def owner_client_with_server(tmp_path, monkeypatch): client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id + sess["pw_changed_at"] = datetime.now(UTC).isoformat() return client, server_id diff --git a/l4d2web/tests/test_workshop_routes.py b/l4d2web/tests/test_workshop_routes.py index cd73b19..e66e957 100644 --- a/l4d2web/tests/test_workshop_routes.py +++ b/l4d2web/tests/test_workshop_routes.py @@ -1,6 +1,7 @@ """Tests for the workshop overlay routes (add items, remove items, build, admin refresh).""" from __future__ import annotations +from datetime import UTC, datetime from typing import Iterable from unittest.mock import patch @@ -54,6 +55,7 @@ def env_user(tmp_path, monkeypatch): c = app.test_client() with c.session_transaction() as sess: sess["user_id"] = uid + sess["pw_changed_at"] = datetime.now(UTC).isoformat() sess["csrf_token"] = "test-token" return c