tests: admin user management
14 tests covering /admin/users/<id>/{deactivate,activate,delete}:
- deactivate/activate flips and 404 on unknown user
- deactivate-self refused (409)
- deactivated user cannot log in (same 401 as wrong-password)
- existing sessions stop working after deactivation (load_current_user
returns None for inactive users → @require_login redirects to /login)
- delete-self refused (409)
- delete refuses when user owns Server, Blueprint, or custom Overlay
- delete on orphan succeeds (302 → /admin/users)
- delete nulls out Job.user_id (jobs survive as audit trail)
- delete-other-admin succeeds when more than one admin exists
The "last admin" branch in the delete endpoint is defense-in-depth and
unreachable via normal flow (any path that triggers it is shadowed by
self-delete) — covered by a comment, not a test.
This commit is contained in:
parent
bcea450e98
commit
c594d4b5e8
1 changed files with 260 additions and 0 deletions
260
l4d2web/tests/test_admin_users.py
Normal file
260
l4d2web/tests/test_admin_users.py
Normal file
|
|
@ -0,0 +1,260 @@
|
|||
import pytest
|
||||
from sqlalchemy import select
|
||||
|
||||
from l4d2web.app import create_app
|
||||
from l4d2web.auth import hash_password
|
||||
from l4d2web.db import init_db, session_scope
|
||||
from l4d2web.models import Blueprint, Job, Overlay, Server, User
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def admin_client(tmp_path, monkeypatch):
|
||||
"""Returns a logged-in admin client + the admin's user id.
|
||||
|
||||
Also creates a second admin so delete-of-the-last-admin is not the
|
||||
default scenario; tests that need that condition can prune.
|
||||
"""
|
||||
db_url = f"sqlite:///{tmp_path/'admin_users.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||
init_db()
|
||||
|
||||
with session_scope() as db:
|
||||
admin = User(username="admin", password_digest=hash_password("secret"), admin=True)
|
||||
second_admin = User(username="admin2", password_digest=hash_password("secret"), admin=True)
|
||||
db.add_all([admin, second_admin])
|
||||
db.flush()
|
||||
admin_id = admin.id
|
||||
|
||||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = admin_id
|
||||
sess["csrf_token"] = "test-token"
|
||||
return client, admin_id
|
||||
|
||||
|
||||
def _add_user(username: str, *, admin: bool = False, active: bool = True) -> int:
|
||||
with session_scope() as db:
|
||||
u = User(
|
||||
username=username,
|
||||
password_digest=hash_password("secret"),
|
||||
admin=admin,
|
||||
active=active,
|
||||
)
|
||||
db.add(u)
|
||||
db.flush()
|
||||
return u.id
|
||||
|
||||
|
||||
def _user_exists(user_id: int) -> bool:
|
||||
with session_scope() as db:
|
||||
return db.scalar(select(User).where(User.id == user_id)) is not None
|
||||
|
||||
|
||||
def _user_active(user_id: int) -> bool:
|
||||
with session_scope() as db:
|
||||
u = db.scalar(select(User).where(User.id == user_id))
|
||||
assert u is not None
|
||||
return u.active
|
||||
|
||||
|
||||
def _post(client, path: str):
|
||||
return client.post(path, headers={"X-CSRF-Token": "test-token"})
|
||||
|
||||
|
||||
# ---------------- deactivate / activate ----------------
|
||||
|
||||
|
||||
def test_deactivate_flips_active_false(admin_client):
|
||||
client, _ = admin_client
|
||||
target = _add_user("bob")
|
||||
|
||||
response = _post(client, f"/admin/users/{target}/deactivate")
|
||||
|
||||
assert response.status_code == 302
|
||||
assert response.headers["Location"].endswith("/admin/users")
|
||||
assert _user_active(target) is False
|
||||
|
||||
|
||||
def test_activate_flips_active_true(admin_client):
|
||||
client, _ = admin_client
|
||||
target = _add_user("bob", active=False)
|
||||
|
||||
response = _post(client, f"/admin/users/{target}/activate")
|
||||
|
||||
assert response.status_code == 302
|
||||
assert _user_active(target) is True
|
||||
|
||||
|
||||
def test_deactivate_self_refused(admin_client):
|
||||
client, admin_id = admin_client
|
||||
|
||||
response = _post(client, f"/admin/users/{admin_id}/deactivate")
|
||||
|
||||
assert response.status_code == 409
|
||||
assert _user_active(admin_id) is True
|
||||
|
||||
|
||||
def test_deactivate_unknown_user_404(admin_client):
|
||||
client, _ = admin_client
|
||||
|
||||
response = _post(client, "/admin/users/99999/deactivate")
|
||||
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_deactivated_user_cannot_log_in(admin_client):
|
||||
client, _ = admin_client
|
||||
target = _add_user("bob")
|
||||
_post(client, f"/admin/users/{target}/deactivate")
|
||||
|
||||
# Fresh client — different session, no admin login.
|
||||
fresh = client.application.test_client()
|
||||
response = fresh.post(
|
||||
"/login",
|
||||
data={"username": "bob", "password": "secret"},
|
||||
headers={"X-CSRF-Token": "test-token"},
|
||||
)
|
||||
|
||||
# Same response as wrong-password / unknown-user (no leak about active).
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
def test_deactivated_user_existing_session_invalidated(admin_client):
|
||||
"""An active session at the moment of deactivation stops working."""
|
||||
client, _ = admin_client
|
||||
target = _add_user("bob")
|
||||
|
||||
# Forge a session for bob.
|
||||
bob_client = client.application.test_client()
|
||||
with bob_client.session_transaction() as sess:
|
||||
sess["user_id"] = target
|
||||
sess["csrf_token"] = "test-token"
|
||||
|
||||
# Sanity: bob can hit a logged-in route.
|
||||
pre = bob_client.get("/dashboard")
|
||||
assert pre.status_code == 200
|
||||
|
||||
# Admin deactivates bob.
|
||||
_post(client, f"/admin/users/{target}/deactivate")
|
||||
|
||||
# bob's session should now be treated as logged-out → /dashboard redirects to /login.
|
||||
post = bob_client.get("/dashboard", follow_redirects=False)
|
||||
assert post.status_code == 302
|
||||
assert "/login" in post.headers["Location"]
|
||||
|
||||
|
||||
# ---------------- delete: refusal cases ----------------
|
||||
|
||||
|
||||
def test_delete_self_refused(admin_client):
|
||||
client, admin_id = admin_client
|
||||
|
||||
response = _post(client, f"/admin/users/{admin_id}/delete")
|
||||
|
||||
assert response.status_code == 409
|
||||
assert _user_exists(admin_id)
|
||||
|
||||
|
||||
def test_delete_other_admin_succeeds_when_more_than_one_admin(admin_client):
|
||||
"""The fixture creates 2 admins; admin can delete admin2."""
|
||||
client, _ = admin_client
|
||||
with session_scope() as db:
|
||||
admin2 = db.scalar(select(User).where(User.username == "admin2"))
|
||||
admin2_id = admin2.id
|
||||
|
||||
response = _post(client, f"/admin/users/{admin2_id}/delete")
|
||||
|
||||
assert response.status_code == 302
|
||||
assert not _user_exists(admin2_id)
|
||||
|
||||
|
||||
# Note: the "last admin refused" branch in admin_users_delete is defense-
|
||||
# in-depth. Through normal flow it's unreachable: a non-admin can't reach
|
||||
# the endpoint (@require_admin), and an admin trying to delete the only
|
||||
# remaining admin must be deleting themselves — which the self-delete
|
||||
# check rejects first. The branch is kept anyway in case the auth model
|
||||
# ever evolves (e.g. service accounts that bypass require_admin).
|
||||
|
||||
|
||||
def test_delete_blocked_when_owns_servers(admin_client):
|
||||
client, _ = admin_client
|
||||
target = _add_user("bob")
|
||||
with session_scope() as db:
|
||||
bp = Blueprint(user_id=target, name="bp", arguments="[]", config="[]")
|
||||
db.add(bp)
|
||||
db.flush()
|
||||
db.add(Server(user_id=target, blueprint_id=bp.id, name="alpha", port=27015))
|
||||
|
||||
response = _post(client, f"/admin/users/{target}/delete")
|
||||
|
||||
assert response.status_code == 409
|
||||
assert b"server" in response.data
|
||||
assert _user_exists(target)
|
||||
|
||||
|
||||
def test_delete_blocked_when_owns_blueprints(admin_client):
|
||||
client, _ = admin_client
|
||||
target = _add_user("bob")
|
||||
with session_scope() as db:
|
||||
db.add(Blueprint(user_id=target, name="bp", arguments="[]", config="[]"))
|
||||
|
||||
response = _post(client, f"/admin/users/{target}/delete")
|
||||
|
||||
assert response.status_code == 409
|
||||
assert b"blueprint" in response.data
|
||||
assert _user_exists(target)
|
||||
|
||||
|
||||
def test_delete_blocked_when_owns_custom_overlays(admin_client):
|
||||
client, _ = admin_client
|
||||
target = _add_user("bob")
|
||||
with session_scope() as db:
|
||||
db.add(Overlay(name="custom", path="/opt/custom", user_id=target))
|
||||
|
||||
response = _post(client, f"/admin/users/{target}/delete")
|
||||
|
||||
assert response.status_code == 409
|
||||
assert b"overlay" in response.data
|
||||
assert _user_exists(target)
|
||||
|
||||
|
||||
def test_delete_unknown_user_404(admin_client):
|
||||
client, _ = admin_client
|
||||
|
||||
response = _post(client, "/admin/users/99999/delete")
|
||||
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
# ---------------- delete: success cases ----------------
|
||||
|
||||
|
||||
def test_delete_succeeds_for_orphan(admin_client):
|
||||
client, _ = admin_client
|
||||
target = _add_user("bob")
|
||||
|
||||
response = _post(client, f"/admin/users/{target}/delete")
|
||||
|
||||
assert response.status_code == 302
|
||||
assert response.headers["Location"].endswith("/admin/users")
|
||||
assert not _user_exists(target)
|
||||
|
||||
|
||||
def test_delete_succeeds_when_user_only_owns_jobs(admin_client):
|
||||
"""Job rows have nullable user_id and are kept as audit trail."""
|
||||
client, _ = admin_client
|
||||
target = _add_user("bob")
|
||||
with session_scope() as db:
|
||||
db.add(Job(user_id=target, server_id=None, operation="install", state="done"))
|
||||
db.flush()
|
||||
|
||||
response = _post(client, f"/admin/users/{target}/delete")
|
||||
|
||||
assert response.status_code == 302
|
||||
assert not _user_exists(target)
|
||||
# The Job row survives, but its user_id is now NULL.
|
||||
with session_scope() as db:
|
||||
jobs = db.scalars(select(Job)).all()
|
||||
assert len(jobs) == 1
|
||||
assert jobs[0].user_id is None
|
||||
Loading…
Reference in a new issue