left4me/l4d2web/tests/test_admin_users.py
mwiegand e75f379dcb
auth: reject sessions older than user.password_changed_at
load_current_user now treats a session whose pw_changed_at marker
is missing, malformed, or older than the user's current
password_changed_at as logged-out. Same shape as the existing
user.active check.

Forced fan-out updates to every test fixture that forges a session
via session_transaction(): each now stamps a current pw_changed_at
marker. test_deactivated_user_existing_session_invalidated keeps
its meaning — the deactivation still flips the user to inactive,
and load_current_user rejects the session via the user.active
branch before reaching the freshness branch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:54:13 +02:00

266 lines
8.3 KiB
Python

import pytest
from sqlalchemy import select
from l4d2web.app import create_app
from l4d2web.auth import hash_password
from l4d2web.db import init_db, session_scope
from l4d2web.models import Blueprint, Job, Overlay, Server, User
@pytest.fixture
def admin_client(tmp_path, monkeypatch):
"""Returns a logged-in admin client + the admin's user id.
Also creates a second admin so delete-of-the-last-admin is not the
default scenario; tests that need that condition can prune.
"""
db_url = f"sqlite:///{tmp_path/'admin_users.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
with session_scope() as db:
admin = User(username="admin", password_digest=hash_password("secret"), admin=True)
second_admin = User(username="admin2", password_digest=hash_password("secret"), admin=True)
db.add_all([admin, second_admin])
db.flush()
admin_id = admin.id
admin_marker = admin.password_changed_at.isoformat()
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = admin_marker
sess["csrf_token"] = "test-token"
return client, admin_id
def _add_user(username: str, *, admin: bool = False, active: bool = True) -> int:
with session_scope() as db:
u = User(
username=username,
password_digest=hash_password("secret"),
admin=admin,
active=active,
)
db.add(u)
db.flush()
return u.id
def _user_exists(user_id: int) -> bool:
with session_scope() as db:
return db.scalar(select(User).where(User.id == user_id)) is not None
def _user_active(user_id: int) -> bool:
with session_scope() as db:
u = db.scalar(select(User).where(User.id == user_id))
assert u is not None
return u.active
def _post(client, path: str):
return client.post(path, headers={"X-CSRF-Token": "test-token"})
# ---------------- deactivate / activate ----------------
def test_deactivate_flips_active_false(admin_client):
client, _ = admin_client
target = _add_user("bob")
response = _post(client, f"/admin/users/{target}/deactivate")
assert response.status_code == 302
assert response.headers["Location"].endswith("/admin/users")
assert _user_active(target) is False
def test_activate_flips_active_true(admin_client):
client, _ = admin_client
target = _add_user("bob", active=False)
response = _post(client, f"/admin/users/{target}/activate")
assert response.status_code == 302
assert _user_active(target) is True
def test_deactivate_self_refused(admin_client):
client, admin_id = admin_client
response = _post(client, f"/admin/users/{admin_id}/deactivate")
assert response.status_code == 409
assert _user_active(admin_id) is True
def test_deactivate_unknown_user_404(admin_client):
client, _ = admin_client
response = _post(client, "/admin/users/99999/deactivate")
assert response.status_code == 404
def test_deactivated_user_cannot_log_in(admin_client):
client, _ = admin_client
target = _add_user("bob")
_post(client, f"/admin/users/{target}/deactivate")
# Fresh client — different session, no admin login.
fresh = client.application.test_client()
response = fresh.post(
"/login",
data={"username": "bob", "password": "secret"},
headers={"X-CSRF-Token": "test-token"},
)
# Same response as wrong-password / unknown-user (no leak about active).
assert response.status_code == 401
def test_deactivated_user_existing_session_invalidated(admin_client):
"""An active session at the moment of deactivation stops working."""
client, _ = admin_client
target = _add_user("bob")
with session_scope() as db:
bob = db.scalar(select(User).where(User.id == target))
bob_marker = bob.password_changed_at.isoformat()
# Forge a session for bob.
bob_client = client.application.test_client()
with bob_client.session_transaction() as sess:
sess["user_id"] = target
sess["pw_changed_at"] = bob_marker
sess["csrf_token"] = "test-token"
# Sanity: bob can hit a logged-in route.
pre = bob_client.get("/dashboard")
assert pre.status_code == 200
# Admin deactivates bob.
_post(client, f"/admin/users/{target}/deactivate")
# bob's session should now be treated as logged-out → /dashboard redirects to /login.
post = bob_client.get("/dashboard", follow_redirects=False)
assert post.status_code == 302
assert "/login" in post.headers["Location"]
# ---------------- delete: refusal cases ----------------
def test_delete_self_refused(admin_client):
client, admin_id = admin_client
response = _post(client, f"/admin/users/{admin_id}/delete")
assert response.status_code == 409
assert _user_exists(admin_id)
def test_delete_other_admin_succeeds_when_more_than_one_admin(admin_client):
"""The fixture creates 2 admins; admin can delete admin2."""
client, _ = admin_client
with session_scope() as db:
admin2 = db.scalar(select(User).where(User.username == "admin2"))
admin2_id = admin2.id
response = _post(client, f"/admin/users/{admin2_id}/delete")
assert response.status_code == 302
assert not _user_exists(admin2_id)
# Note: the "last admin refused" branch in admin_users_delete is defense-
# in-depth. Through normal flow it's unreachable: a non-admin can't reach
# the endpoint (@require_admin), and an admin trying to delete the only
# remaining admin must be deleting themselves — which the self-delete
# check rejects first. The branch is kept anyway in case the auth model
# ever evolves (e.g. service accounts that bypass require_admin).
def test_delete_blocked_when_owns_servers(admin_client):
client, _ = admin_client
target = _add_user("bob")
with session_scope() as db:
bp = Blueprint(user_id=target, name="bp", arguments="[]", config="[]")
db.add(bp)
db.flush()
db.add(Server(user_id=target, blueprint_id=bp.id, name="alpha", port=27015))
response = _post(client, f"/admin/users/{target}/delete")
assert response.status_code == 409
assert b"server" in response.data
assert _user_exists(target)
def test_delete_blocked_when_owns_blueprints(admin_client):
client, _ = admin_client
target = _add_user("bob")
with session_scope() as db:
db.add(Blueprint(user_id=target, name="bp", arguments="[]", config="[]"))
response = _post(client, f"/admin/users/{target}/delete")
assert response.status_code == 409
assert b"blueprint" in response.data
assert _user_exists(target)
def test_delete_blocked_when_owns_custom_overlays(admin_client):
client, _ = admin_client
target = _add_user("bob")
with session_scope() as db:
db.add(Overlay(name="custom", path="/opt/custom", user_id=target))
response = _post(client, f"/admin/users/{target}/delete")
assert response.status_code == 409
assert b"overlay" in response.data
assert _user_exists(target)
def test_delete_unknown_user_404(admin_client):
client, _ = admin_client
response = _post(client, "/admin/users/99999/delete")
assert response.status_code == 404
# ---------------- delete: success cases ----------------
def test_delete_succeeds_for_orphan(admin_client):
client, _ = admin_client
target = _add_user("bob")
response = _post(client, f"/admin/users/{target}/delete")
assert response.status_code == 302
assert response.headers["Location"].endswith("/admin/users")
assert not _user_exists(target)
def test_delete_succeeds_when_user_only_owns_jobs(admin_client):
"""Job rows have nullable user_id and are kept as audit trail."""
client, _ = admin_client
target = _add_user("bob")
with session_scope() as db:
db.add(Job(user_id=target, server_id=None, operation="install", state="done"))
db.flush()
response = _post(client, f"/admin/users/{target}/delete")
assert response.status_code == 302
assert not _user_exists(target)
# The Job row survives, but its user_id is now NULL.
with session_scope() as db:
jobs = db.scalars(select(Job)).all()
assert len(jobs) == 1
assert jobs[0].user_id is None