diff --git a/l4d2web/routes/overlay_routes.py b/l4d2web/routes/overlay_routes.py index 012971e..6a8ac5f 100644 --- a/l4d2web/routes/overlay_routes.py +++ b/l4d2web/routes/overlay_routes.py @@ -7,7 +7,9 @@ from l4d2host.paths import get_left4me_root from l4d2web.auth import current_user, require_login from l4d2web.db import session_scope -from l4d2web.models import BlueprintOverlay, Overlay +from l4d2web.models import BlueprintOverlay, Job, Overlay +from l4d2web.services import overlay_builders +from l4d2web.services.job_worker import enqueue_build_overlay from l4d2web.services.overlay_creation import ( create_overlay_directory, generate_overlay_path, @@ -15,6 +17,7 @@ from l4d2web.services.overlay_creation import ( CREATABLE_OVERLAY_TYPES = {"workshop", "script"} +WIPE_SCRIPT = "find /overlay -mindepth 1 -delete" bp = Blueprint("overlay", __name__) @@ -53,12 +56,13 @@ def create_overlay() -> Response: name = request.form.get("name", "").strip() overlay_type = request.form.get("type", "workshop").strip().lower() + system_wide = request.form.get("system_wide") == "1" if not name: return Response("missing fields", status=400) if overlay_type not in CREATABLE_OVERLAY_TYPES: return Response(f"unknown overlay type: {overlay_type}", status=400) - scope_user_id: int | None = user.id + scope_user_id: int | None = None if (system_wide and user.admin) else user.id with session_scope() as db: if _name_already_taken(db, name, scope_user_id): @@ -123,3 +127,80 @@ def delete_overlay(overlay_id: int) -> Response: shutil.rmtree(target) return redirect("/overlays") + + +def _load_script_overlay(db, overlay_id: int, user) -> tuple[Overlay | None, Response | None]: + overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id)) + if overlay is None: + return None, Response(status=404) + if overlay.type != "script": + return None, Response("not a script overlay", status=400) + if not _can_edit_overlay(overlay, user): + return None, Response(status=403) + return overlay, None + + +@bp.post("/overlays//script") +@require_login +def update_script(overlay_id: int) -> Response: + user = current_user() + assert user is not None + script_text = request.form.get("script", "") + with session_scope() as db: + overlay, err = _load_script_overlay(db, overlay_id, user) + if err is not None: + return err + overlay.script = script_text + enqueue_build_overlay(db, overlay_id=overlay_id, user_id=user.id) + return redirect(f"/overlays/{overlay_id}") + + +@bp.post("/overlays//build") +@require_login +def manual_build(overlay_id: int) -> Response: + user = current_user() + assert user is not None + with session_scope() as db: + overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id)) + if overlay is None: + return Response(status=404) + if not _can_edit_overlay(overlay, user): + return Response(status=403) + job = enqueue_build_overlay(db, overlay_id=overlay_id, user_id=user.id) + job_id = job.id + return redirect(f"/jobs/{job_id}") + + +@bp.post("/overlays//wipe") +@require_login +def wipe_overlay(overlay_id: int) -> Response: + user = current_user() + assert user is not None + with session_scope() as db: + overlay, err = _load_script_overlay(db, overlay_id, user) + if err is not None: + return err + running = db.scalar( + select(Job).where( + Job.operation == "build_overlay", + Job.overlay_id == overlay_id, + Job.state.in_({"running", "cancelling"}), + ) + ) + if running is not None: + return Response("build in progress for this overlay", status=409) + + overlay_builders.run_sandboxed_script( + overlay_id, + WIPE_SCRIPT, + on_stdout=lambda _line: None, + on_stderr=lambda _line: None, + should_cancel=lambda: False, + ) + + with session_scope() as db: + overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id)) + if overlay is not None: + overlay.last_build_status = "" + + return redirect(f"/overlays/{overlay_id}") diff --git a/l4d2web/routes/workshop_routes.py b/l4d2web/routes/workshop_routes.py index 7685a22..9b653a6 100644 --- a/l4d2web/routes/workshop_routes.py +++ b/l4d2web/routes/workshop_routes.py @@ -142,20 +142,6 @@ def remove_item(overlay_id: int, item_id: int) -> Response: return _render_item_table(overlay_id) -@bp.post("/overlays//build") -@require_login -def manual_build(overlay_id: int) -> Response: - user = current_user() - assert user is not None - with session_scope() as db: - overlay, err = _check_workshop_overlay_access(overlay_id, user, db) - if err is not None: - return err - job = enqueue_build_overlay(db, overlay_id=overlay_id, user_id=user.id) - job_id = job.id - return redirect(f"/jobs/{job_id}") - - @bp.post("/admin/workshop/refresh") @require_admin def admin_refresh() -> Response: diff --git a/l4d2web/tests/test_script_overlay_routes.py b/l4d2web/tests/test_script_overlay_routes.py new file mode 100644 index 0000000..d54f361 --- /dev/null +++ b/l4d2web/tests/test_script_overlay_routes.py @@ -0,0 +1,256 @@ +"""Routes for type='script' overlays: create, /script (update body), +/wipe, /build. Permissions mirror workshop overlays (owner or admin).""" +from __future__ import annotations + +import pytest +from sqlalchemy import select + +from l4d2web.app import create_app +from l4d2web.auth import hash_password +from l4d2web.db import init_db, session_scope +from l4d2web.models import Job, Overlay, User + + +@pytest.fixture +def app(tmp_path, monkeypatch): + db_url = f"sqlite:///{tmp_path/'script-routes.db'}" + monkeypatch.setenv("DATABASE_URL", db_url) + monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path)) + flask_app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) + init_db() + return flask_app + + +@pytest.fixture +def alice_id(app) -> int: + with session_scope() as s: + user = User(username="alice", password_digest=hash_password("x"), admin=False) + s.add(user) + s.flush() + return user.id + + +@pytest.fixture +def bob_id(app) -> int: + with session_scope() as s: + user = User(username="bob", password_digest=hash_password("x"), admin=False) + s.add(user) + s.flush() + return user.id + + +@pytest.fixture +def admin_id(app) -> int: + with session_scope() as s: + user = User(username="admin", password_digest=hash_password("x"), admin=True) + s.add(user) + s.flush() + return user.id + + +def _client_for(app, user_id: int): + client = app.test_client() + with client.session_transaction() as sess: + sess["user_id"] = user_id + sess["csrf_token"] = "test-token" + return client + + +def _create_script_overlay(app, user_id: int, *, name: str = "x") -> int: + client = _client_for(app, user_id) + response = client.post( + "/overlays", + data={"name": name, "type": "script"}, + headers={"X-CSRF-Token": "test-token"}, + ) + assert response.status_code == 302, response.get_data(as_text=True) + with session_scope() as s: + return s.scalar(select(Overlay.id).where(Overlay.name == name)) + + +def test_create_script_overlay(app, alice_id) -> None: + client = _client_for(app, alice_id) + response = client.post( + "/overlays", + data={"name": "first", "type": "script"}, + headers={"X-CSRF-Token": "test-token"}, + ) + assert response.status_code == 302 + with session_scope() as s: + overlay = s.query(Overlay).filter_by(name="first").one() + assert overlay.type == "script" + assert overlay.script == "" + assert overlay.last_build_status == "" + assert overlay.user_id == alice_id + assert overlay.path == str(overlay.id) + + +def test_admin_creates_system_wide_script_overlay(app, admin_id) -> None: + client = _client_for(app, admin_id) + response = client.post( + "/overlays", + data={"name": "system", "type": "script", "system_wide": "1"}, + headers={"X-CSRF-Token": "test-token"}, + ) + assert response.status_code == 302 + with session_scope() as s: + overlay = s.query(Overlay).filter_by(name="system").one() + assert overlay.user_id is None + + +def test_non_admin_system_wide_flag_is_ignored(app, alice_id) -> None: + client = _client_for(app, alice_id) + response = client.post( + "/overlays", + data={"name": "evil", "type": "script", "system_wide": "1"}, + headers={"X-CSRF-Token": "test-token"}, + ) + assert response.status_code == 302 + with session_scope() as s: + overlay = s.query(Overlay).filter_by(name="evil").one() + assert overlay.user_id == alice_id + + +def test_update_script_body_enqueues_build(app, alice_id) -> None: + overlay_id = _create_script_overlay(app, alice_id) + client = _client_for(app, alice_id) + + r1 = client.post( + f"/overlays/{overlay_id}/script", + data={"script": "echo new"}, + headers={"X-CSRF-Token": "test-token"}, + ) + assert r1.status_code == 302 + with session_scope() as s: + overlay = s.query(Overlay).filter_by(id=overlay_id).one() + assert overlay.script == "echo new" + jobs = s.query(Job).filter_by(operation="build_overlay", overlay_id=overlay_id).all() + assert len(jobs) == 1 + + # Coalesce against pending. + r2 = client.post( + f"/overlays/{overlay_id}/script", + data={"script": "echo newer"}, + headers={"X-CSRF-Token": "test-token"}, + ) + assert r2.status_code == 302 + with session_scope() as s: + jobs = s.query(Job).filter_by(operation="build_overlay", overlay_id=overlay_id).all() + assert len(jobs) == 1 + + +def test_manual_rebuild(app, alice_id) -> None: + overlay_id = _create_script_overlay(app, alice_id) + client = _client_for(app, alice_id) + + r1 = client.post( + f"/overlays/{overlay_id}/build", + headers={"X-CSRF-Token": "test-token"}, + ) + assert r1.status_code == 302 + with session_scope() as s: + jobs = s.query(Job).filter_by(operation="build_overlay", overlay_id=overlay_id).all() + assert len(jobs) == 1 + + # Coalesce. + r2 = client.post( + f"/overlays/{overlay_id}/build", + headers={"X-CSRF-Token": "test-token"}, + ) + assert r2.status_code == 302 + with session_scope() as s: + jobs = s.query(Job).filter_by(operation="build_overlay", overlay_id=overlay_id).all() + assert len(jobs) == 1 + + +def test_wipe_runs_find_delete(app, alice_id, monkeypatch) -> None: + overlay_id = _create_script_overlay(app, alice_id) + + # Pre-set a "successful" status so we can verify wipe resets it. + with session_scope() as s: + overlay = s.query(Overlay).filter_by(id=overlay_id).one() + overlay.last_build_status = "ok" + + captured: dict = {} + + def fake_run(overlay_id_arg, script_text, *, on_stdout, on_stderr, should_cancel): + captured["overlay_id"] = overlay_id_arg + captured["script"] = script_text + + from l4d2web.services import overlay_builders + monkeypatch.setattr(overlay_builders, "run_sandboxed_script", fake_run) + + client = _client_for(app, alice_id) + response = client.post( + f"/overlays/{overlay_id}/wipe", + headers={"X-CSRF-Token": "test-token"}, + ) + assert response.status_code == 302 + assert captured["overlay_id"] == overlay_id + assert captured["script"] == "find /overlay -mindepth 1 -delete" + + with session_scope() as s: + overlay = s.query(Overlay).filter_by(id=overlay_id).one() + assert overlay.last_build_status == "" + # Wipe does NOT auto-enqueue a rebuild. + jobs = s.query(Job).filter_by(operation="build_overlay", overlay_id=overlay_id).all() + assert len(jobs) == 0 + + +def test_wipe_refuses_during_running_build(app, alice_id, monkeypatch) -> None: + overlay_id = _create_script_overlay(app, alice_id) + + # Mark a build as running for this overlay. + with session_scope() as s: + s.add( + Job( + user_id=alice_id, + server_id=None, + overlay_id=overlay_id, + operation="build_overlay", + state="running", + ) + ) + + invocations: list = [] + + def fake_run(*args, **kwargs): + invocations.append((args, kwargs)) + + from l4d2web.services import overlay_builders + monkeypatch.setattr(overlay_builders, "run_sandboxed_script", fake_run) + + client = _client_for(app, alice_id) + response = client.post( + f"/overlays/{overlay_id}/wipe", + headers={"X-CSRF-Token": "test-token"}, + ) + assert response.status_code == 409 + assert invocations == [] + + +def test_permissions_non_owner_denied(app, alice_id, bob_id) -> None: + overlay_id = _create_script_overlay(app, alice_id, name="alice-private") + bob = _client_for(app, bob_id) + + r1 = bob.post( + f"/overlays/{overlay_id}/script", + data={"script": "boom"}, + headers={"X-CSRF-Token": "test-token"}, + ) + assert r1.status_code == 403 + + +def test_permissions_admin_can_edit_any(app, alice_id, admin_id) -> None: + overlay_id = _create_script_overlay(app, alice_id, name="alice-private") + admin = _client_for(app, admin_id) + + r1 = admin.post( + f"/overlays/{overlay_id}/script", + data={"script": "echo admin"}, + headers={"X-CSRF-Token": "test-token"}, + ) + assert r1.status_code == 302 + with session_scope() as s: + overlay = s.query(Overlay).filter_by(id=overlay_id).one() + assert overlay.script == "echo admin"