left4me/l4d2web/services/overlay_builders.py
mwiegand 8971b23617
refactor(sandbox): collapse l4d2-sandbox user into left4me
The hardening refactor that just landed closes the same-uid attack
surface (FS view, ptrace, /proc visibility, signals) for the web +
gameserver units via systemd directives plus system-wide
kernel.yama.ptrace_scope=2. Keeping the script-sandbox on a separate
uid was the inconsistent half-step — defense-in-depth only, with
build-time-idmap complexity attached. One principle wins: harden
once, share the uid.

scripts/libexec/left4me-script-sandbox: drop the idmap block (uid
lookups, STAGING setup, cleanup_staging trap, mount --bind
--map-users), switch User=/Group= to left4me, point BindPaths at
\$OVERLAY_DIR directly. Header comment updated to reflect
hardening-not-uid as the same-uid defense. nsenter self-wrap kept —
it's about mount-namespace escape, not uid.

Tests + comments + companion docs updated. Build-time-idmap and
overlay-idmap plans marked SUPERSEDED; user-uid-split spec revised
to "1 user is correct"; one-line update notes on the hardening
specs and the build-overlay-unit-design.

Companion ckn-bw commit removes the l4d2-sandbox user + group and
tightens /var/lib/left4me from 0711 → 0755 (the traverse-only mode
was specifically for the sandbox uid).
2026-05-15 15:50:57 +02:00

436 lines
15 KiB
Python

"""Overlay builder registry.
Each `Overlay.type` maps to a builder. `build_overlay(overlay_id)` jobs (and
the synchronous `initialize_server` hook) dispatch through `BUILDERS`. Adding
a new overlay type means writing a new builder and registering it here — no
changes to the worker, the mount layer, or the blueprint editor.
"""
from __future__ import annotations
import os
import subprocess
import tempfile
import time
from datetime import UTC, datetime
from pathlib import Path
from typing import Callable, Protocol
import requests
from sqlalchemy import select
from l4d2host.paths import get_left4me_root
from l4d2web.db import session_scope
from l4d2web.models import Overlay, OverlayWorkshopItem, WorkshopItem
from l4d2web.services.host_commands import run_command
from l4d2web.services.steam_workshop import WorkshopMetadata, download_to_cache
from l4d2web.services.workshop_paths import cache_path, workshop_cache_root
CancelCheck = Callable[[], bool]
LogSink = Callable[[str], None]
SCRIPT_SANDBOX_HELPER = "/usr/local/libexec/left4me/left4me-script-sandbox"
DISK_BUDGET_BYTES = 20 * 1024**3
DOWNLOAD_RETRY_ATTEMPTS = 3
DOWNLOAD_RETRY_BACKOFF_SECONDS = (1.0, 2.0)
assert len(DOWNLOAD_RETRY_BACKOFF_SECONDS) == DOWNLOAD_RETRY_ATTEMPTS - 1
def _sleep_with_cancel(
seconds: float,
should_cancel: CancelCheck,
*,
poll_interval: float = 0.25,
) -> bool:
"""Sleep up to `seconds`, returning early (True) if `should_cancel` becomes
True. Returns False on a full uninterrupted sleep. Polls every
`poll_interval` seconds."""
deadline = time.monotonic() + seconds
while True:
if should_cancel():
return True
remaining = deadline - time.monotonic()
if remaining <= 0:
return False
time.sleep(min(poll_interval, remaining))
def _download_with_retry(
meta: WorkshopMetadata,
cache_root: Path,
*,
on_stderr: LogSink,
should_cancel: CancelCheck,
) -> None:
"""Wrap `download_to_cache` with bounded retries and cancel-aware backoff.
Raises the last exception after `DOWNLOAD_RETRY_ATTEMPTS` failures.
Raises `InterruptedError` if cancelled during a backoff sleep."""
last_exc: BaseException | None = None
for attempt in range(1, DOWNLOAD_RETRY_ATTEMPTS + 1):
try:
download_to_cache(meta, cache_root, should_cancel=should_cancel)
return
except InterruptedError:
raise
except (requests.RequestException, OSError) as exc:
last_exc = exc
if attempt == DOWNLOAD_RETRY_ATTEMPTS:
raise
on_stderr(
f"workshop {meta.steam_id} attempt {attempt}/"
f"{DOWNLOAD_RETRY_ATTEMPTS} failed: {exc}"
)
delay = DOWNLOAD_RETRY_BACKOFF_SECONDS[attempt - 1]
if _sleep_with_cancel(delay, should_cancel):
raise InterruptedError("download cancelled during backoff") from last_exc
def _sandbox_script_dir() -> Path:
"""Where script tmpfiles live before being bind-mounted into the sandbox.
Cannot live in /tmp because the web service unit has PrivateTmp=yes:
its /tmp is a per-instance namespace that PID 1 (which actually performs
the BindReadOnlyPaths during sandbox setup) cannot resolve. /var/lib is
not affected by PrivateTmp and is visible to PID 1, so the bind-mount
succeeds.
"""
return get_left4me_root() / "sandbox-scripts"
class BuildError(RuntimeError):
"""Raised by builders when a build fails for a builder-specific reason
(e.g. disk-budget exceeded). Distinct from subprocess-level
HostCommandError / CommandCancelledError."""
class OverlayBuilder(Protocol):
def build(
self,
overlay: Overlay,
*,
on_stdout: LogSink,
on_stderr: LogSink,
should_cancel: CancelCheck,
) -> None: ...
def _overlay_root(overlay: Overlay) -> Path:
return get_left4me_root() / "overlays" / overlay.path
def overlay_path_for_id(overlay_id: int) -> Path:
return get_left4me_root() / "overlays" / str(overlay_id)
class WorkshopBuilder:
"""Diff-apply symlinks under `left4dead2/addons/` against the overlay's
current `WorkshopItem` associations. Downloads missing or stale items
before applying symlinks. Items with no file_url are skipped with a
warning."""
def build(
self,
overlay: Overlay,
*,
on_stdout: LogSink,
on_stderr: LogSink,
should_cancel: CancelCheck,
) -> None:
addons_dir = _overlay_root(overlay) / "left4dead2" / "addons"
addons_dir.mkdir(parents=True, exist_ok=True)
# Snapshot every field the decision logic + downloader will need.
with session_scope() as db:
rows = db.scalars(
select(WorkshopItem)
.join(
OverlayWorkshopItem,
OverlayWorkshopItem.workshop_item_id == WorkshopItem.id,
)
.where(OverlayWorkshopItem.overlay_id == overlay.id)
).all()
items_data = [
(
it.id,
it.steam_id,
it.title,
it.filename,
it.file_url,
it.file_size,
it.time_updated,
it.preview_url,
it.last_downloaded_at,
it.last_error,
)
for it in rows
]
cache_root = workshop_cache_root()
cache_root.mkdir(parents=True, exist_ok=True)
downloaded = 0
cached = 0
skipped: set[str] = set()
# Download phase.
for (
item_id, steam_id, title, filename, file_url, file_size,
time_updated, preview_url, last_downloaded_at, last_error,
) in items_data:
if should_cancel():
on_stderr("workshop build cancelled during download phase")
return
if not file_url:
on_stderr(
f"workshop item {steam_id} skipped: no file_url "
f"(steam result: {last_error or 'unknown'})"
)
skipped.add(steam_id)
continue
target = cache_path(steam_id)
needs_download = (
last_downloaded_at is None
or not target.exists()
or int(target.stat().st_mtime) != int(time_updated)
or int(target.stat().st_size) != int(file_size)
)
if not needs_download:
cached += 1
continue
# download_to_cache only reads steam_id, file_url, file_size, time_updated;
# consumer_app_id and result are required by the dataclass but unused here.
meta = WorkshopMetadata(
steam_id=steam_id,
title=title,
filename=filename,
file_url=file_url,
file_size=file_size,
time_updated=time_updated,
preview_url=preview_url,
consumer_app_id=550,
result=1,
)
on_stdout(f"workshop {steam_id} downloading")
try:
_download_with_retry(
meta, cache_root,
on_stderr=on_stderr, should_cancel=should_cancel,
)
except InterruptedError:
raise
except Exception as exc:
with session_scope() as db:
wi = db.scalar(select(WorkshopItem).where(WorkshopItem.id == item_id))
if wi is not None:
wi.last_error = f"download failed: {exc}"
raise
with session_scope() as db:
wi = db.scalar(select(WorkshopItem).where(WorkshopItem.id == item_id))
if wi is not None:
wi.last_downloaded_at = datetime.now(UTC)
wi.last_error = ""
downloaded += 1
# Re-snapshot for symlink phase: only items that have a cache file now
# belong in the desired set. Items skipped above stay out.
desired: dict[str, Path] = {}
for (
_item_id, steam_id, _title, _filename, _file_url, _file_size,
_time_updated, _preview_url, _last_downloaded_at, _last_error,
) in items_data:
if steam_id in skipped:
continue
target = cache_path(steam_id)
if not target.exists():
continue # shouldn't happen post-download; safety net
desired[f"{steam_id}.vpk"] = target.resolve()
if should_cancel():
on_stderr("workshop build cancelled before applying symlinks")
return
# existing: symlink-name -> link target (only symlinks pointing at our cache)
existing: dict[str, Path] = {}
for entry in os.scandir(addons_dir):
if not entry.is_symlink():
continue
try:
link_target = Path(os.readlink(entry.path))
except OSError:
continue
try:
resolved = link_target.resolve(strict=False)
except OSError:
continue
if not _is_under(resolved, cache_root):
continue
existing[entry.name] = resolved
created = 0
removed = 0
unchanged = 0
for name, current_target in existing.items():
if should_cancel():
on_stderr("workshop build cancelled mid-removal")
return
desired_target = desired.get(name)
if desired_target is None:
os.unlink(addons_dir / name)
removed += 1
elif current_target != desired_target:
os.unlink(addons_dir / name)
else:
unchanged += 1
post_removal_existing = {
name for name in existing
if name in desired and existing[name] == desired[name]
}
for name, target in desired.items():
if should_cancel():
on_stderr("workshop build cancelled mid-creation")
return
if name in post_removal_existing:
continue
link_path = addons_dir / name
# Defensive: if a non-symlink file collides with our name, leave it.
if link_path.exists() and not link_path.is_symlink():
on_stderr(
f"refusing to overwrite non-symlink at {link_path}; manual intervention required"
)
continue
if link_path.is_symlink():
# An obsolete symlink not in `existing` (target outside cache).
# We don't manage these — leave alone.
on_stderr(
f"refusing to overwrite foreign symlink at {link_path}"
)
continue
os.symlink(target, link_path)
created += 1
on_stdout(
f"workshop overlay {overlay.name!r}: "
f"downloaded={downloaded} cached={cached} skipped={len(skipped)} "
f"created={created} removed={removed} unchanged={unchanged}"
)
def run_sandboxed_script(
overlay_id: int,
script_text: str,
*,
on_stdout: LogSink,
on_stderr: LogSink,
should_cancel: CancelCheck,
) -> None:
"""Write `script_text` to a tmpfile and exec it inside the privileged
sandbox helper. Used by ScriptBuilder.build and by the wipe route."""
script_dir = _sandbox_script_dir()
script_dir.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(
"w", suffix=".sh", delete=False, dir=str(script_dir)
) as f:
f.write(script_text or "")
script_path = f.name
# NamedTemporaryFile creates 0600 owned by the web user; the sandbox runs
# as left4me and needs to read it (bind-mounted at /script.sh inside
# the sandbox). Script content is not a secret — it's plain bash stored
# in the DB and editable by the user — so 0644 is appropriate.
os.chmod(script_path, 0o644)
try:
cmd = [
"sudo",
"-n",
SCRIPT_SANDBOX_HELPER,
str(overlay_id),
script_path,
]
run_command(
cmd,
on_stdout=on_stdout,
on_stderr=on_stderr,
should_cancel=should_cancel,
)
finally:
try:
os.unlink(script_path)
except FileNotFoundError:
pass
class ScriptBuilder:
"""Run an arbitrary user-authored bash script against the overlay dir
inside a hardened systemd-run transient service. The script sees the
overlay dir as RW `/overlay` and a curated host RO mount; everything
else is isolated. After exit, enforce a 20 GB cap on `du -sb /overlay`."""
def build(
self,
overlay: Overlay,
*,
on_stdout: LogSink,
on_stderr: LogSink,
should_cancel: CancelCheck,
) -> None:
# Ensure target dir exists so the helper's bind-mount validation passes.
overlay_path_for_id(overlay.id).mkdir(parents=True, exist_ok=True)
run_sandboxed_script(
overlay.id,
overlay.script or "",
on_stdout=on_stdout,
on_stderr=on_stderr,
should_cancel=should_cancel,
)
self._enforce_disk_budget(overlay.id, on_stderr)
def _enforce_disk_budget(self, overlay_id: int, on_stderr: LogSink) -> None:
target = overlay_path_for_id(overlay_id)
size_output = subprocess.check_output(["du", "-sb", str(target)])
size_bytes = int(size_output.split()[0])
if size_bytes > DISK_BUDGET_BYTES:
on_stderr(
f"overlay exceeded 20 GB disk cap: {size_bytes} bytes > "
f"{DISK_BUDGET_BYTES} bytes"
)
raise BuildError("disk-cap-exceeded")
def _is_under(path: Path, root: Path) -> bool:
try:
path.relative_to(root)
except ValueError:
return False
return True
class FilesBuilder:
"""No-op builder for `files` overlays. Their content IS the overlay
directory — every save / upload / move / delete is immediately
authoritative. The build step exists only so the overlay-build subsystem
can dispatch uniformly across all overlay types; here it simply ensures
the overlay directory exists."""
def build(
self,
overlay: Overlay,
*,
on_stdout: LogSink,
on_stderr: LogSink,
should_cancel: CancelCheck,
) -> None:
overlay_path_for_id(overlay.id).mkdir(parents=True, exist_ok=True)
on_stdout(f"files overlay {overlay.name!r}: directory ensured (no-op build)")
BUILDERS: dict[str, OverlayBuilder] = {
"workshop": WorkshopBuilder(),
"script": ScriptBuilder(),
"files": FilesBuilder(),
}