"""Overlay builder registry. Each `Overlay.type` maps to a builder. `build_overlay(overlay_id)` jobs (and the synchronous `initialize_server` hook) dispatch through `BUILDERS`. Adding a new overlay type means writing a new builder and registering it here — no changes to the worker, the mount layer, or the blueprint editor. """ from __future__ import annotations import os import subprocess import tempfile from pathlib import Path from typing import Callable, Protocol from sqlalchemy import select from l4d2host.paths import get_left4me_root from l4d2web.db import session_scope from l4d2web.models import Overlay, OverlayWorkshopItem, WorkshopItem from l4d2web.services.host_commands import run_command from l4d2web.services.workshop_paths import cache_path, workshop_cache_root CancelCheck = Callable[[], bool] LogSink = Callable[[str], None] SCRIPT_SANDBOX_HELPER = "/usr/local/libexec/left4me/left4me-script-sandbox" DISK_BUDGET_BYTES = 20 * 1024**3 def _sandbox_script_dir() -> Path: """Where script tmpfiles live before being bind-mounted into the sandbox. Cannot live in /tmp because the web service unit has PrivateTmp=yes: its /tmp is a per-instance namespace that PID 1 (which actually performs the BindReadOnlyPaths during sandbox setup) cannot resolve. /var/lib is not affected by PrivateTmp and is visible to PID 1, so the bind-mount succeeds. """ return get_left4me_root() / "sandbox-scripts" class BuildError(RuntimeError): """Raised by builders when a build fails for a builder-specific reason (e.g. disk-budget exceeded). Distinct from subprocess-level HostCommandError / CommandCancelledError.""" class OverlayBuilder(Protocol): def build( self, overlay: Overlay, *, on_stdout: LogSink, on_stderr: LogSink, should_cancel: CancelCheck, ) -> None: ... def _overlay_root(overlay: Overlay) -> Path: return get_left4me_root() / "overlays" / overlay.path def overlay_path_for_id(overlay_id: int) -> Path: return get_left4me_root() / "overlays" / str(overlay_id) class WorkshopBuilder: """Diff-apply symlinks under `left4dead2/addons/` against the overlay's current `WorkshopItem` associations. Cached items get an absolute symlink into `workshop_cache/{steam_id}.vpk`. Items missing from cache are skipped with a warning rather than turned into broken symlinks.""" def build( self, overlay: Overlay, *, on_stdout: LogSink, on_stderr: LogSink, should_cancel: CancelCheck, ) -> None: addons_dir = _overlay_root(overlay) / "left4dead2" / "addons" addons_dir.mkdir(parents=True, exist_ok=True) with session_scope() as db: items = db.scalars( select(WorkshopItem) .join( OverlayWorkshopItem, OverlayWorkshopItem.workshop_item_id == WorkshopItem.id, ) .where(OverlayWorkshopItem.overlay_id == overlay.id) ).all() # Detach items so we can use them outside the session. items_data = [ (it.steam_id, it.last_downloaded_at) for it in items ] cache_root = workshop_cache_root() # desired: symlink-name -> absolute target path (only for cached items) desired: dict[str, Path] = {} skipped: list[str] = [] for steam_id, last_downloaded_at in items_data: target = cache_path(steam_id) if last_downloaded_at is None or not target.exists(): skipped.append(steam_id) continue desired[f"{steam_id}.vpk"] = target.resolve() if should_cancel(): on_stderr("workshop build cancelled before applying symlinks") return # existing: symlink-name -> link target (only for symlinks pointing at our cache) existing: dict[str, Path] = {} for entry in os.scandir(addons_dir): if not entry.is_symlink(): continue try: target = Path(os.readlink(entry.path)) except OSError: continue try: resolved = target.resolve(strict=False) except OSError: continue if not _is_under(resolved, cache_root): continue existing[entry.name] = resolved created = 0 removed = 0 unchanged = 0 # Remove obsolete or stale symlinks first. for name, current_target in existing.items(): if should_cancel(): on_stderr("workshop build cancelled mid-removal") return desired_target = desired.get(name) if desired_target is None: os.unlink(addons_dir / name) removed += 1 elif current_target != desired_target: os.unlink(addons_dir / name) # will be recreated below else: unchanged += 1 # Recompute existing post-removal so the create loop knows what's left. post_removal_existing = { name for name in existing if name in desired and existing[name] == desired[name] } # Create new symlinks. for name, target in desired.items(): if should_cancel(): on_stderr("workshop build cancelled mid-creation") return if name in post_removal_existing: continue link_path = addons_dir / name # Defensive: if a non-symlink file collides with our name, leave it. if link_path.exists() and not link_path.is_symlink(): on_stderr( f"refusing to overwrite non-symlink at {link_path}; manual intervention required" ) continue if link_path.is_symlink(): # An obsolete symlink not in `existing` (target outside cache). # We don't manage these — leave alone. on_stderr( f"refusing to overwrite foreign symlink at {link_path}" ) continue os.symlink(str(target), str(link_path)) created += 1 on_stdout( f"workshop overlay {overlay.name!r}: created={created} " f"removed={removed} unchanged={unchanged} " f"skipped(uncached)={len(skipped)}" ) for steam_id in skipped: on_stderr( f"workshop item {steam_id} skipped: not yet downloaded " f"(refresh required before this overlay can mount it)" ) def run_sandboxed_script( overlay_id: int, script_text: str, *, on_stdout: LogSink, on_stderr: LogSink, should_cancel: CancelCheck, ) -> None: """Write `script_text` to a tmpfile and exec it inside the privileged sandbox helper. Used by ScriptBuilder.build and by the wipe route.""" script_dir = _sandbox_script_dir() script_dir.mkdir(parents=True, exist_ok=True) with tempfile.NamedTemporaryFile( "w", suffix=".sh", delete=False, dir=str(script_dir) ) as f: f.write(script_text or "") script_path = f.name # NamedTemporaryFile creates 0600 owned by the web user; the sandbox runs # as l4d2-sandbox and needs to read it (bind-mounted at /script.sh inside # the sandbox). Script content is not a secret — it's plain bash stored # in the DB and editable by the user — so 0644 is appropriate. os.chmod(script_path, 0o644) try: cmd = [ "sudo", "-n", SCRIPT_SANDBOX_HELPER, str(overlay_id), script_path, ] run_command( cmd, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel, ) finally: try: os.unlink(script_path) except FileNotFoundError: pass class ScriptBuilder: """Run an arbitrary user-authored bash script against the overlay dir inside a bubblewrap + systemd-run sandbox. The script sees the overlay dir as RW `/overlay` and a curated host RO mount; everything else is isolated. After exit, enforce a 20 GB cap on `du -sb /overlay`.""" def build( self, overlay: Overlay, *, on_stdout: LogSink, on_stderr: LogSink, should_cancel: CancelCheck, ) -> None: # Ensure target dir exists so the helper's bind-mount validation passes. overlay_path_for_id(overlay.id).mkdir(parents=True, exist_ok=True) run_sandboxed_script( overlay.id, overlay.script or "", on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel, ) self._enforce_disk_budget(overlay.id, on_stderr) def _enforce_disk_budget(self, overlay_id: int, on_stderr: LogSink) -> None: target = overlay_path_for_id(overlay_id) size_output = subprocess.check_output(["du", "-sb", str(target)]) size_bytes = int(size_output.split()[0]) if size_bytes > DISK_BUDGET_BYTES: on_stderr( f"overlay exceeded 20 GB disk cap: {size_bytes} bytes > " f"{DISK_BUDGET_BYTES} bytes" ) raise BuildError("disk-cap-exceeded") def _is_under(path: Path, root: Path) -> bool: try: path.relative_to(root) except ValueError: return False return True BUILDERS: dict[str, OverlayBuilder] = { "workshop": WorkshopBuilder(), "script": ScriptBuilder(), }