diff --git a/l4d2web/tests/test_admin_users.py b/l4d2web/tests/test_admin_users.py new file mode 100644 index 0000000..5294e9e --- /dev/null +++ b/l4d2web/tests/test_admin_users.py @@ -0,0 +1,260 @@ +import pytest +from sqlalchemy import select + +from l4d2web.app import create_app +from l4d2web.auth import hash_password +from l4d2web.db import init_db, session_scope +from l4d2web.models import Blueprint, Job, Overlay, Server, User + + +@pytest.fixture +def admin_client(tmp_path, monkeypatch): + """Returns a logged-in admin client + the admin's user id. + + Also creates a second admin so delete-of-the-last-admin is not the + default scenario; tests that need that condition can prune. + """ + db_url = f"sqlite:///{tmp_path/'admin_users.db'}" + monkeypatch.setenv("DATABASE_URL", db_url) + app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) + init_db() + + with session_scope() as db: + admin = User(username="admin", password_digest=hash_password("secret"), admin=True) + second_admin = User(username="admin2", password_digest=hash_password("secret"), admin=True) + db.add_all([admin, second_admin]) + db.flush() + admin_id = admin.id + + client = app.test_client() + with client.session_transaction() as sess: + sess["user_id"] = admin_id + sess["csrf_token"] = "test-token" + return client, admin_id + + +def _add_user(username: str, *, admin: bool = False, active: bool = True) -> int: + with session_scope() as db: + u = User( + username=username, + password_digest=hash_password("secret"), + admin=admin, + active=active, + ) + db.add(u) + db.flush() + return u.id + + +def _user_exists(user_id: int) -> bool: + with session_scope() as db: + return db.scalar(select(User).where(User.id == user_id)) is not None + + +def _user_active(user_id: int) -> bool: + with session_scope() as db: + u = db.scalar(select(User).where(User.id == user_id)) + assert u is not None + return u.active + + +def _post(client, path: str): + return client.post(path, headers={"X-CSRF-Token": "test-token"}) + + +# ---------------- deactivate / activate ---------------- + + +def test_deactivate_flips_active_false(admin_client): + client, _ = admin_client + target = _add_user("bob") + + response = _post(client, f"/admin/users/{target}/deactivate") + + assert response.status_code == 302 + assert response.headers["Location"].endswith("/admin/users") + assert _user_active(target) is False + + +def test_activate_flips_active_true(admin_client): + client, _ = admin_client + target = _add_user("bob", active=False) + + response = _post(client, f"/admin/users/{target}/activate") + + assert response.status_code == 302 + assert _user_active(target) is True + + +def test_deactivate_self_refused(admin_client): + client, admin_id = admin_client + + response = _post(client, f"/admin/users/{admin_id}/deactivate") + + assert response.status_code == 409 + assert _user_active(admin_id) is True + + +def test_deactivate_unknown_user_404(admin_client): + client, _ = admin_client + + response = _post(client, "/admin/users/99999/deactivate") + + assert response.status_code == 404 + + +def test_deactivated_user_cannot_log_in(admin_client): + client, _ = admin_client + target = _add_user("bob") + _post(client, f"/admin/users/{target}/deactivate") + + # Fresh client — different session, no admin login. + fresh = client.application.test_client() + response = fresh.post( + "/login", + data={"username": "bob", "password": "secret"}, + headers={"X-CSRF-Token": "test-token"}, + ) + + # Same response as wrong-password / unknown-user (no leak about active). + assert response.status_code == 401 + + +def test_deactivated_user_existing_session_invalidated(admin_client): + """An active session at the moment of deactivation stops working.""" + client, _ = admin_client + target = _add_user("bob") + + # Forge a session for bob. + bob_client = client.application.test_client() + with bob_client.session_transaction() as sess: + sess["user_id"] = target + sess["csrf_token"] = "test-token" + + # Sanity: bob can hit a logged-in route. + pre = bob_client.get("/dashboard") + assert pre.status_code == 200 + + # Admin deactivates bob. + _post(client, f"/admin/users/{target}/deactivate") + + # bob's session should now be treated as logged-out → /dashboard redirects to /login. + post = bob_client.get("/dashboard", follow_redirects=False) + assert post.status_code == 302 + assert "/login" in post.headers["Location"] + + +# ---------------- delete: refusal cases ---------------- + + +def test_delete_self_refused(admin_client): + client, admin_id = admin_client + + response = _post(client, f"/admin/users/{admin_id}/delete") + + assert response.status_code == 409 + assert _user_exists(admin_id) + + +def test_delete_other_admin_succeeds_when_more_than_one_admin(admin_client): + """The fixture creates 2 admins; admin can delete admin2.""" + client, _ = admin_client + with session_scope() as db: + admin2 = db.scalar(select(User).where(User.username == "admin2")) + admin2_id = admin2.id + + response = _post(client, f"/admin/users/{admin2_id}/delete") + + assert response.status_code == 302 + assert not _user_exists(admin2_id) + + +# Note: the "last admin refused" branch in admin_users_delete is defense- +# in-depth. Through normal flow it's unreachable: a non-admin can't reach +# the endpoint (@require_admin), and an admin trying to delete the only +# remaining admin must be deleting themselves — which the self-delete +# check rejects first. The branch is kept anyway in case the auth model +# ever evolves (e.g. service accounts that bypass require_admin). + + +def test_delete_blocked_when_owns_servers(admin_client): + client, _ = admin_client + target = _add_user("bob") + with session_scope() as db: + bp = Blueprint(user_id=target, name="bp", arguments="[]", config="[]") + db.add(bp) + db.flush() + db.add(Server(user_id=target, blueprint_id=bp.id, name="alpha", port=27015)) + + response = _post(client, f"/admin/users/{target}/delete") + + assert response.status_code == 409 + assert b"server" in response.data + assert _user_exists(target) + + +def test_delete_blocked_when_owns_blueprints(admin_client): + client, _ = admin_client + target = _add_user("bob") + with session_scope() as db: + db.add(Blueprint(user_id=target, name="bp", arguments="[]", config="[]")) + + response = _post(client, f"/admin/users/{target}/delete") + + assert response.status_code == 409 + assert b"blueprint" in response.data + assert _user_exists(target) + + +def test_delete_blocked_when_owns_custom_overlays(admin_client): + client, _ = admin_client + target = _add_user("bob") + with session_scope() as db: + db.add(Overlay(name="custom", path="/opt/custom", user_id=target)) + + response = _post(client, f"/admin/users/{target}/delete") + + assert response.status_code == 409 + assert b"overlay" in response.data + assert _user_exists(target) + + +def test_delete_unknown_user_404(admin_client): + client, _ = admin_client + + response = _post(client, "/admin/users/99999/delete") + + assert response.status_code == 404 + + +# ---------------- delete: success cases ---------------- + + +def test_delete_succeeds_for_orphan(admin_client): + client, _ = admin_client + target = _add_user("bob") + + response = _post(client, f"/admin/users/{target}/delete") + + assert response.status_code == 302 + assert response.headers["Location"].endswith("/admin/users") + assert not _user_exists(target) + + +def test_delete_succeeds_when_user_only_owns_jobs(admin_client): + """Job rows have nullable user_id and are kept as audit trail.""" + client, _ = admin_client + target = _add_user("bob") + with session_scope() as db: + db.add(Job(user_id=target, server_id=None, operation="install", state="done")) + db.flush() + + response = _post(client, f"/admin/users/{target}/delete") + + assert response.status_code == 302 + assert not _user_exists(target) + # The Job row survives, but its user_id is now NULL. + with session_scope() as db: + jobs = db.scalars(select(Job)).all() + assert len(jobs) == 1 + assert jobs[0].user_id is None