import shutil from flask import Blueprint, Response, redirect, request from sqlalchemy import select from l4d2host.paths import get_left4me_root from l4d2web.auth import current_user, require_login from l4d2web.db import session_scope from l4d2web.models import BlueprintOverlay, Job, Overlay from l4d2web.services import overlay_builders from l4d2web.services.job_worker import enqueue_build_overlay from l4d2web.services.overlay_creation import ( create_overlay_directory, generate_overlay_path, ) CREATABLE_OVERLAY_TYPES = {"workshop", "script"} WIPE_SCRIPT = "find /overlay -mindepth 1 -delete" bp = Blueprint("overlay", __name__) def _is_managed_path(overlay: Overlay) -> bool: return overlay.path == str(overlay.id) def _can_edit_overlay(overlay: Overlay, user) -> bool: if user is None: return False if user.admin: return True if overlay.type in {"workshop", "script"}: return overlay.user_id == user.id return False def _name_already_taken(db, name: str, scope_user_id: int | None, *, except_id: int | None = None) -> bool: query = select(Overlay).where(Overlay.name == name) if scope_user_id is None: query = query.where(Overlay.user_id.is_(None)) else: query = query.where(Overlay.user_id == scope_user_id) if except_id is not None: query = query.where(Overlay.id != except_id) return db.scalar(query) is not None @bp.post("/overlays") @require_login def create_overlay() -> Response: user = current_user() assert user is not None name = request.form.get("name", "").strip() overlay_type = request.form.get("type", "workshop").strip().lower() system_wide = request.form.get("system_wide") == "1" if not name: return Response("missing fields", status=400) if overlay_type not in CREATABLE_OVERLAY_TYPES: return Response(f"unknown overlay type: {overlay_type}", status=400) scope_user_id: int | None = None if (system_wide and user.admin) else user.id with session_scope() as db: if _name_already_taken(db, name, scope_user_id): return Response("overlay already exists", status=409) overlay = Overlay(name=name, path="", type=overlay_type, user_id=scope_user_id) db.add(overlay) db.flush() overlay.path = generate_overlay_path(overlay.id) db.flush() create_overlay_directory(overlay) new_id = overlay.id return redirect(f"/overlays/{new_id}") @bp.post("/overlays/") @require_login def update_overlay(overlay_id: int) -> Response: user = current_user() assert user is not None name = request.form.get("name", "").strip() if not name: return Response("missing fields", status=400) with session_scope() as db: overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id)) if overlay is None: return Response(status=404) if not _can_edit_overlay(overlay, user): return Response(status=403) if _name_already_taken(db, name, overlay.user_id, except_id=overlay_id): return Response("overlay already exists", status=409) overlay.name = name return redirect(f"/overlays/{overlay_id}") @bp.post("/overlays//delete") @require_login def delete_overlay(overlay_id: int) -> Response: user = current_user() assert user is not None with session_scope() as db: overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id)) if overlay is None: return Response(status=404) if not _can_edit_overlay(overlay, user): return Response(status=403) in_use = db.scalar(select(BlueprintOverlay).where(BlueprintOverlay.overlay_id == overlay_id)) if in_use is not None: return Response("overlay is in use", status=409) path_value = overlay.path path_is_managed = _is_managed_path(overlay) db.delete(overlay) if path_is_managed and path_value: target = get_left4me_root() / "overlays" / path_value if target.exists(): shutil.rmtree(target) return redirect("/overlays") def _load_script_overlay(db, overlay_id: int, user) -> tuple[Overlay | None, Response | None]: overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id)) if overlay is None: return None, Response(status=404) if overlay.type != "script": return None, Response("not a script overlay", status=400) if not _can_edit_overlay(overlay, user): return None, Response(status=403) return overlay, None @bp.post("/overlays//script") @require_login def update_script(overlay_id: int) -> Response: user = current_user() assert user is not None # HTML form submission of