import pytest from sqlalchemy import select from l4d2web.app import create_app from l4d2web.auth import hash_password from l4d2web.db import init_db, session_scope from l4d2web.models import Blueprint, Job, Overlay, Server, User @pytest.fixture def admin_client(tmp_path, monkeypatch): """Returns a logged-in admin client + the admin's user id. Also creates a second admin so delete-of-the-last-admin is not the default scenario; tests that need that condition can prune. """ db_url = f"sqlite:///{tmp_path/'admin_users.db'}" monkeypatch.setenv("DATABASE_URL", db_url) app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) init_db() with session_scope() as db: admin = User(username="admin", password_digest=hash_password("secret"), admin=True) second_admin = User(username="admin2", password_digest=hash_password("secret"), admin=True) db.add_all([admin, second_admin]) db.flush() admin_id = admin.id client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = admin_id sess["csrf_token"] = "test-token" return client, admin_id def _add_user(username: str, *, admin: bool = False, active: bool = True) -> int: with session_scope() as db: u = User( username=username, password_digest=hash_password("secret"), admin=admin, active=active, ) db.add(u) db.flush() return u.id def _user_exists(user_id: int) -> bool: with session_scope() as db: return db.scalar(select(User).where(User.id == user_id)) is not None def _user_active(user_id: int) -> bool: with session_scope() as db: u = db.scalar(select(User).where(User.id == user_id)) assert u is not None return u.active def _post(client, path: str): return client.post(path, headers={"X-CSRF-Token": "test-token"}) # ---------------- deactivate / activate ---------------- def test_deactivate_flips_active_false(admin_client): client, _ = admin_client target = _add_user("bob") response = _post(client, f"/admin/users/{target}/deactivate") assert response.status_code == 302 assert response.headers["Location"].endswith("/admin/users") assert _user_active(target) is False def test_activate_flips_active_true(admin_client): client, _ = admin_client target = _add_user("bob", active=False) response = _post(client, f"/admin/users/{target}/activate") assert response.status_code == 302 assert _user_active(target) is True def test_deactivate_self_refused(admin_client): client, admin_id = admin_client response = _post(client, f"/admin/users/{admin_id}/deactivate") assert response.status_code == 409 assert _user_active(admin_id) is True def test_deactivate_unknown_user_404(admin_client): client, _ = admin_client response = _post(client, "/admin/users/99999/deactivate") assert response.status_code == 404 def test_deactivated_user_cannot_log_in(admin_client): client, _ = admin_client target = _add_user("bob") _post(client, f"/admin/users/{target}/deactivate") # Fresh client — different session, no admin login. fresh = client.application.test_client() response = fresh.post( "/login", data={"username": "bob", "password": "secret"}, headers={"X-CSRF-Token": "test-token"}, ) # Same response as wrong-password / unknown-user (no leak about active). assert response.status_code == 401 def test_deactivated_user_existing_session_invalidated(admin_client): """An active session at the moment of deactivation stops working.""" client, _ = admin_client target = _add_user("bob") # Forge a session for bob. bob_client = client.application.test_client() with bob_client.session_transaction() as sess: sess["user_id"] = target sess["csrf_token"] = "test-token" # Sanity: bob can hit a logged-in route. pre = bob_client.get("/dashboard") assert pre.status_code == 200 # Admin deactivates bob. _post(client, f"/admin/users/{target}/deactivate") # bob's session should now be treated as logged-out → /dashboard redirects to /login. post = bob_client.get("/dashboard", follow_redirects=False) assert post.status_code == 302 assert "/login" in post.headers["Location"] # ---------------- delete: refusal cases ---------------- def test_delete_self_refused(admin_client): client, admin_id = admin_client response = _post(client, f"/admin/users/{admin_id}/delete") assert response.status_code == 409 assert _user_exists(admin_id) def test_delete_other_admin_succeeds_when_more_than_one_admin(admin_client): """The fixture creates 2 admins; admin can delete admin2.""" client, _ = admin_client with session_scope() as db: admin2 = db.scalar(select(User).where(User.username == "admin2")) admin2_id = admin2.id response = _post(client, f"/admin/users/{admin2_id}/delete") assert response.status_code == 302 assert not _user_exists(admin2_id) # Note: the "last admin refused" branch in admin_users_delete is defense- # in-depth. Through normal flow it's unreachable: a non-admin can't reach # the endpoint (@require_admin), and an admin trying to delete the only # remaining admin must be deleting themselves — which the self-delete # check rejects first. The branch is kept anyway in case the auth model # ever evolves (e.g. service accounts that bypass require_admin). def test_delete_blocked_when_owns_servers(admin_client): client, _ = admin_client target = _add_user("bob") with session_scope() as db: bp = Blueprint(user_id=target, name="bp", arguments="[]", config="[]") db.add(bp) db.flush() db.add(Server(user_id=target, blueprint_id=bp.id, name="alpha", port=27015)) response = _post(client, f"/admin/users/{target}/delete") assert response.status_code == 409 assert b"server" in response.data assert _user_exists(target) def test_delete_blocked_when_owns_blueprints(admin_client): client, _ = admin_client target = _add_user("bob") with session_scope() as db: db.add(Blueprint(user_id=target, name="bp", arguments="[]", config="[]")) response = _post(client, f"/admin/users/{target}/delete") assert response.status_code == 409 assert b"blueprint" in response.data assert _user_exists(target) def test_delete_blocked_when_owns_custom_overlays(admin_client): client, _ = admin_client target = _add_user("bob") with session_scope() as db: db.add(Overlay(name="custom", path="/opt/custom", user_id=target)) response = _post(client, f"/admin/users/{target}/delete") assert response.status_code == 409 assert b"overlay" in response.data assert _user_exists(target) def test_delete_unknown_user_404(admin_client): client, _ = admin_client response = _post(client, "/admin/users/99999/delete") assert response.status_code == 404 # ---------------- delete: success cases ---------------- def test_delete_succeeds_for_orphan(admin_client): client, _ = admin_client target = _add_user("bob") response = _post(client, f"/admin/users/{target}/delete") assert response.status_code == 302 assert response.headers["Location"].endswith("/admin/users") assert not _user_exists(target) def test_delete_succeeds_when_user_only_owns_jobs(admin_client): """Job rows have nullable user_id and are kept as audit trail.""" client, _ = admin_client target = _add_user("bob") with session_scope() as db: db.add(Job(user_id=target, server_id=None, operation="install", state="done")) db.flush() response = _post(client, f"/admin/users/{target}/delete") assert response.status_code == 302 assert not _user_exists(target) # The Job row survives, but its user_id is now NULL. with session_scope() as db: jobs = db.scalars(select(Job)).all() assert len(jobs) == 1 assert jobs[0].user_id is None