auth: reject sessions older than user.password_changed_at

load_current_user now treats a session whose pw_changed_at marker
is missing, malformed, or older than the user's current
password_changed_at as logged-out. Same shape as the existing
user.active check.

Forced fan-out updates to every test fixture that forges a session
via session_transaction(): each now stamps a current pw_changed_at
marker. test_deactivated_user_existing_session_invalidated keeps
its meaning — the deactivation still flips the user to inactive,
and load_current_user rejects the session via the user.active
branch before reaching the freshness branch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
mwiegand 2026-05-11 21:54:13 +02:00
parent 84dc672180
commit e75f379dcb
No known key found for this signature in database
12 changed files with 115 additions and 3 deletions

View file

@ -1,3 +1,4 @@
from datetime import datetime
from functools import wraps
from typing import Callable, TypeVar
from urllib.parse import quote, unquote
@ -39,9 +40,29 @@ def load_current_user() -> None:
return
with session_scope() as db:
user = db.scalar(select(User).where(User.id == int(user_id)))
# Treat deactivated users as logged-out so existing sessions stop
# working as soon as an admin flips active=False.
g.user = user if user is not None and user.active else None
if user is None or not user.active:
g.user = None
return
marker = session.get("pw_changed_at")
if marker is None:
g.user = None
return
try:
marker_dt = datetime.fromisoformat(marker)
except ValueError:
g.user = None
return
# user.password_changed_at comes back naive from SQLite; strip tz from the
# marker so an aware-marker session (just stamped from an in-memory user)
# compares cleanly with a freshly-loaded user row.
if marker_dt.tzinfo is not None:
marker_dt = marker_dt.replace(tzinfo=None)
if marker_dt < user.password_changed_at:
g.user = None
return
g.user = user
def current_user() -> User | None:

View file

@ -25,10 +25,12 @@ def admin_client(tmp_path, monkeypatch):
db.add_all([admin, second_admin])
db.flush()
admin_id = admin.id
admin_marker = admin.password_changed_at.isoformat()
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = admin_marker
sess["csrf_token"] = "test-token"
return client, admin_id
@ -124,11 +126,15 @@ def test_deactivated_user_existing_session_invalidated(admin_client):
"""An active session at the moment of deactivation stops working."""
client, _ = admin_client
target = _add_user("bob")
with session_scope() as db:
bob = db.scalar(select(User).where(User.id == target))
bob_marker = bob.password_changed_at.isoformat()
# Forge a session for bob.
bob_client = client.application.test_client()
with bob_client.session_transaction() as sess:
sess["user_id"] = target
sess["pw_changed_at"] = bob_marker
sess["csrf_token"] = "test-token"
# Sanity: bob can hit a logged-in route.

View file

@ -134,6 +134,56 @@ def test_login_sets_session(client) -> None:
assert sess.get("user_id") is not None
def test_load_current_user_rejects_missing_marker(client) -> None:
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("secret"))
db.add(u)
db.flush()
uid = u.id
with client.session_transaction() as sess:
sess["user_id"] = uid
response = client.get("/dashboard")
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_load_current_user_rejects_stale_marker(client) -> None:
from datetime import UTC, datetime, timedelta
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("secret"))
db.add(u)
db.flush()
uid = u.id
stale = datetime.now(UTC).replace(tzinfo=None) - timedelta(minutes=5)
with client.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = stale.isoformat()
response = client.get("/dashboard")
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_load_current_user_accepts_current_marker(client) -> None:
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("secret"))
db.add(u)
db.flush()
uid = u.id
marker = u.password_changed_at.isoformat()
with client.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = marker
response = client.get("/dashboard")
assert response.status_code == 200
def test_login_stamps_password_changed_at_in_session(client) -> None:
with session_scope() as session:
session.add(User(username="alice", password_digest=hash_password("secret")))

View file

@ -1,4 +1,5 @@
import json
from datetime import UTC, datetime
import pytest
from sqlalchemy import select
@ -31,6 +32,7 @@ def user_client(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client
@ -58,6 +60,7 @@ def linked_blueprint(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client, blueprint_id

View file

@ -1,4 +1,5 @@
from pathlib import Path
from datetime import UTC, datetime
from sqlalchemy import text
@ -66,6 +67,7 @@ def test_sse_resume_from_last_seq(seeded_job_logs) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
assert response.status_code == 200
@ -76,6 +78,7 @@ def test_sse_replays_custom_job_log_events(seeded_job_logs) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
text = response.get_data(as_text=True)
@ -91,6 +94,7 @@ def test_sse_resumes_from_last_event_id_header(seeded_job_logs) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}/stream", headers={"Last-Event-ID": "6"})
text = response.get_data(as_text=True)

View file

@ -1,5 +1,6 @@
"""HTTP-level tests for the overlay 'Files' tree-fragment + download routes."""
from __future__ import annotations
from datetime import UTC, datetime
import io
import os
@ -34,6 +35,7 @@ def _client_for(app, user_id: int):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client

View file

@ -4,6 +4,7 @@ from l4d2web.auth import hash_password
from l4d2web.db import init_db, session_scope
from l4d2web.models import Blueprint, BlueprintOverlay, Overlay, User
from l4d2web.services.security import validate_overlay_ref
from datetime import UTC, datetime
@pytest.fixture
@ -23,6 +24,7 @@ def admin_client(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client
@ -48,6 +50,7 @@ def user_client_with_overlay(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client
@ -146,6 +149,7 @@ def test_two_users_can_have_workshop_overlay_with_same_name(tmp_path, monkeypatc
c = app.test_client()
with c.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return c

View file

@ -1,5 +1,6 @@
import pytest
from pathlib import Path
from datetime import UTC, datetime
from l4d2web.app import create_app
from l4d2web.auth import hash_password
@ -35,6 +36,7 @@ def auth_client_with_server(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
return client
@ -61,6 +63,7 @@ def user_client_and_other_blueprint(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = owner_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
return client, blueprint_id
@ -345,6 +348,7 @@ def test_admin_can_use_admin_pages(tmp_path, monkeypatch) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
admin_page = client.get("/admin")
assert admin_page.status_code == 200
@ -380,6 +384,7 @@ def test_admin_can_view_other_users_job(tmp_path, monkeypatch) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}")
@ -402,6 +407,7 @@ def test_admin_can_enqueue_runtime_install_job(tmp_path, monkeypatch) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
response = client.post("/admin/install", headers={"X-CSRF-Token": "test-token"})
@ -457,6 +463,7 @@ def test_root_redirects_by_auth_state(tmp_path, monkeypatch) -> None:
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
logged_in_response = client.get("/")
assert logged_in_response.status_code == 302
@ -514,6 +521,7 @@ def test_admin_jobs_page_renders_system_job(tmp_path, monkeypatch) -> None:
admin_client = app.test_client()
with admin_client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
with session_scope() as db:
db.add(Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued"))
@ -541,6 +549,7 @@ def test_non_admin_cannot_view_system_job(tmp_path, monkeypatch) -> None:
user_client = app.test_client()
with user_client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
with session_scope() as db:
job = Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued")
@ -717,6 +726,7 @@ def test_overlay_jobs_page_403_for_other_users_private_overlay(tmp_path, monkeyp
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = other_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/overlays/{overlay_id}/jobs")
assert response.status_code == 403

View file

@ -1,6 +1,7 @@
"""Routes for type='script' overlays: create, /script (update body),
/wipe, /build. Permissions mirror workshop overlays (owner or admin)."""
from __future__ import annotations
from datetime import UTC, datetime
import pytest
from sqlalchemy import select
@ -52,6 +53,7 @@ def _client_for(app, user_id: int):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client

View file

@ -1,4 +1,5 @@
import json
from datetime import UTC, datetime
import pytest
@ -32,6 +33,7 @@ def user_client_with_blueprints(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = payload["user_id"]
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client, payload
@ -52,6 +54,7 @@ def test_servers_page_without_blueprints_shows_create_blueprint_cta(tmp_path, mo
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
response = client.get("/servers")
@ -235,6 +238,7 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc
with client.session_transaction() as sess:
sess["user_id"] = alice_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
alice_resp = client.post(
"/servers",
@ -245,6 +249,7 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc
with client.session_transaction() as sess:
sess["user_id"] = bob_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
bob_resp = client.post(
"/servers",
@ -318,6 +323,7 @@ def test_create_server_returns_409_when_port_range_exhausted(tmp_path, monkeypat
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
first = client.post(

View file

@ -1,4 +1,5 @@
import pytest
from datetime import UTC, datetime
from l4d2web.app import create_app
from l4d2web.auth import hash_password
@ -32,6 +33,7 @@ def owner_client_with_server(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
return client, server_id

View file

@ -1,6 +1,7 @@
"""Tests for the workshop overlay routes (add items, remove items, build,
admin refresh)."""
from __future__ import annotations
from datetime import UTC, datetime
from typing import Iterable
from unittest.mock import patch
@ -54,6 +55,7 @@ def env_user(tmp_path, monkeypatch):
c = app.test_client()
with c.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return c