14 tests covering /admin/users/<id>/{deactivate,activate,delete}:
- deactivate/activate flips and 404 on unknown user
- deactivate-self refused (409)
- deactivated user cannot log in (same 401 as wrong-password)
- existing sessions stop working after deactivation (load_current_user
returns None for inactive users → @require_login redirects to /login)
- delete-self refused (409)
- delete refuses when user owns Server, Blueprint, or custom Overlay
- delete on orphan succeeds (302 → /admin/users)
- delete nulls out Job.user_id (jobs survive as audit trail)
- delete-other-admin succeeds when more than one admin exists
The "last admin" branch in the delete endpoint is defense-in-depth and
unreachable via normal flow (any path that triggers it is shadowed by
self-delete) — covered by a comment, not a test.
260 lines
8 KiB
Python
260 lines
8 KiB
Python
import pytest
|
|
from sqlalchemy import select
|
|
|
|
from l4d2web.app import create_app
|
|
from l4d2web.auth import hash_password
|
|
from l4d2web.db import init_db, session_scope
|
|
from l4d2web.models import Blueprint, Job, Overlay, Server, User
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_client(tmp_path, monkeypatch):
|
|
"""Returns a logged-in admin client + the admin's user id.
|
|
|
|
Also creates a second admin so delete-of-the-last-admin is not the
|
|
default scenario; tests that need that condition can prune.
|
|
"""
|
|
db_url = f"sqlite:///{tmp_path/'admin_users.db'}"
|
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
init_db()
|
|
|
|
with session_scope() as db:
|
|
admin = User(username="admin", password_digest=hash_password("secret"), admin=True)
|
|
second_admin = User(username="admin2", password_digest=hash_password("secret"), admin=True)
|
|
db.add_all([admin, second_admin])
|
|
db.flush()
|
|
admin_id = admin.id
|
|
|
|
client = app.test_client()
|
|
with client.session_transaction() as sess:
|
|
sess["user_id"] = admin_id
|
|
sess["csrf_token"] = "test-token"
|
|
return client, admin_id
|
|
|
|
|
|
def _add_user(username: str, *, admin: bool = False, active: bool = True) -> int:
|
|
with session_scope() as db:
|
|
u = User(
|
|
username=username,
|
|
password_digest=hash_password("secret"),
|
|
admin=admin,
|
|
active=active,
|
|
)
|
|
db.add(u)
|
|
db.flush()
|
|
return u.id
|
|
|
|
|
|
def _user_exists(user_id: int) -> bool:
|
|
with session_scope() as db:
|
|
return db.scalar(select(User).where(User.id == user_id)) is not None
|
|
|
|
|
|
def _user_active(user_id: int) -> bool:
|
|
with session_scope() as db:
|
|
u = db.scalar(select(User).where(User.id == user_id))
|
|
assert u is not None
|
|
return u.active
|
|
|
|
|
|
def _post(client, path: str):
|
|
return client.post(path, headers={"X-CSRF-Token": "test-token"})
|
|
|
|
|
|
# ---------------- deactivate / activate ----------------
|
|
|
|
|
|
def test_deactivate_flips_active_false(admin_client):
|
|
client, _ = admin_client
|
|
target = _add_user("bob")
|
|
|
|
response = _post(client, f"/admin/users/{target}/deactivate")
|
|
|
|
assert response.status_code == 302
|
|
assert response.headers["Location"].endswith("/admin/users")
|
|
assert _user_active(target) is False
|
|
|
|
|
|
def test_activate_flips_active_true(admin_client):
|
|
client, _ = admin_client
|
|
target = _add_user("bob", active=False)
|
|
|
|
response = _post(client, f"/admin/users/{target}/activate")
|
|
|
|
assert response.status_code == 302
|
|
assert _user_active(target) is True
|
|
|
|
|
|
def test_deactivate_self_refused(admin_client):
|
|
client, admin_id = admin_client
|
|
|
|
response = _post(client, f"/admin/users/{admin_id}/deactivate")
|
|
|
|
assert response.status_code == 409
|
|
assert _user_active(admin_id) is True
|
|
|
|
|
|
def test_deactivate_unknown_user_404(admin_client):
|
|
client, _ = admin_client
|
|
|
|
response = _post(client, "/admin/users/99999/deactivate")
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_deactivated_user_cannot_log_in(admin_client):
|
|
client, _ = admin_client
|
|
target = _add_user("bob")
|
|
_post(client, f"/admin/users/{target}/deactivate")
|
|
|
|
# Fresh client — different session, no admin login.
|
|
fresh = client.application.test_client()
|
|
response = fresh.post(
|
|
"/login",
|
|
data={"username": "bob", "password": "secret"},
|
|
headers={"X-CSRF-Token": "test-token"},
|
|
)
|
|
|
|
# Same response as wrong-password / unknown-user (no leak about active).
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_deactivated_user_existing_session_invalidated(admin_client):
|
|
"""An active session at the moment of deactivation stops working."""
|
|
client, _ = admin_client
|
|
target = _add_user("bob")
|
|
|
|
# Forge a session for bob.
|
|
bob_client = client.application.test_client()
|
|
with bob_client.session_transaction() as sess:
|
|
sess["user_id"] = target
|
|
sess["csrf_token"] = "test-token"
|
|
|
|
# Sanity: bob can hit a logged-in route.
|
|
pre = bob_client.get("/dashboard")
|
|
assert pre.status_code == 200
|
|
|
|
# Admin deactivates bob.
|
|
_post(client, f"/admin/users/{target}/deactivate")
|
|
|
|
# bob's session should now be treated as logged-out → /dashboard redirects to /login.
|
|
post = bob_client.get("/dashboard", follow_redirects=False)
|
|
assert post.status_code == 302
|
|
assert "/login" in post.headers["Location"]
|
|
|
|
|
|
# ---------------- delete: refusal cases ----------------
|
|
|
|
|
|
def test_delete_self_refused(admin_client):
|
|
client, admin_id = admin_client
|
|
|
|
response = _post(client, f"/admin/users/{admin_id}/delete")
|
|
|
|
assert response.status_code == 409
|
|
assert _user_exists(admin_id)
|
|
|
|
|
|
def test_delete_other_admin_succeeds_when_more_than_one_admin(admin_client):
|
|
"""The fixture creates 2 admins; admin can delete admin2."""
|
|
client, _ = admin_client
|
|
with session_scope() as db:
|
|
admin2 = db.scalar(select(User).where(User.username == "admin2"))
|
|
admin2_id = admin2.id
|
|
|
|
response = _post(client, f"/admin/users/{admin2_id}/delete")
|
|
|
|
assert response.status_code == 302
|
|
assert not _user_exists(admin2_id)
|
|
|
|
|
|
# Note: the "last admin refused" branch in admin_users_delete is defense-
|
|
# in-depth. Through normal flow it's unreachable: a non-admin can't reach
|
|
# the endpoint (@require_admin), and an admin trying to delete the only
|
|
# remaining admin must be deleting themselves — which the self-delete
|
|
# check rejects first. The branch is kept anyway in case the auth model
|
|
# ever evolves (e.g. service accounts that bypass require_admin).
|
|
|
|
|
|
def test_delete_blocked_when_owns_servers(admin_client):
|
|
client, _ = admin_client
|
|
target = _add_user("bob")
|
|
with session_scope() as db:
|
|
bp = Blueprint(user_id=target, name="bp", arguments="[]", config="[]")
|
|
db.add(bp)
|
|
db.flush()
|
|
db.add(Server(user_id=target, blueprint_id=bp.id, name="alpha", port=27015))
|
|
|
|
response = _post(client, f"/admin/users/{target}/delete")
|
|
|
|
assert response.status_code == 409
|
|
assert b"server" in response.data
|
|
assert _user_exists(target)
|
|
|
|
|
|
def test_delete_blocked_when_owns_blueprints(admin_client):
|
|
client, _ = admin_client
|
|
target = _add_user("bob")
|
|
with session_scope() as db:
|
|
db.add(Blueprint(user_id=target, name="bp", arguments="[]", config="[]"))
|
|
|
|
response = _post(client, f"/admin/users/{target}/delete")
|
|
|
|
assert response.status_code == 409
|
|
assert b"blueprint" in response.data
|
|
assert _user_exists(target)
|
|
|
|
|
|
def test_delete_blocked_when_owns_custom_overlays(admin_client):
|
|
client, _ = admin_client
|
|
target = _add_user("bob")
|
|
with session_scope() as db:
|
|
db.add(Overlay(name="custom", path="/opt/custom", user_id=target))
|
|
|
|
response = _post(client, f"/admin/users/{target}/delete")
|
|
|
|
assert response.status_code == 409
|
|
assert b"overlay" in response.data
|
|
assert _user_exists(target)
|
|
|
|
|
|
def test_delete_unknown_user_404(admin_client):
|
|
client, _ = admin_client
|
|
|
|
response = _post(client, "/admin/users/99999/delete")
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
# ---------------- delete: success cases ----------------
|
|
|
|
|
|
def test_delete_succeeds_for_orphan(admin_client):
|
|
client, _ = admin_client
|
|
target = _add_user("bob")
|
|
|
|
response = _post(client, f"/admin/users/{target}/delete")
|
|
|
|
assert response.status_code == 302
|
|
assert response.headers["Location"].endswith("/admin/users")
|
|
assert not _user_exists(target)
|
|
|
|
|
|
def test_delete_succeeds_when_user_only_owns_jobs(admin_client):
|
|
"""Job rows have nullable user_id and are kept as audit trail."""
|
|
client, _ = admin_client
|
|
target = _add_user("bob")
|
|
with session_scope() as db:
|
|
db.add(Job(user_id=target, server_id=None, operation="install", state="done"))
|
|
db.flush()
|
|
|
|
response = _post(client, f"/admin/users/{target}/delete")
|
|
|
|
assert response.status_code == 302
|
|
assert not _user_exists(target)
|
|
# The Job row survives, but its user_id is now NULL.
|
|
with session_scope() as db:
|
|
jobs = db.scalars(select(Job)).all()
|
|
assert len(jobs) == 1
|
|
assert jobs[0].user_id is None
|