left4me/l4d2host/instances.py
mwiegand 8971b23617
refactor(sandbox): collapse l4d2-sandbox user into left4me
The hardening refactor that just landed closes the same-uid attack
surface (FS view, ptrace, /proc visibility, signals) for the web +
gameserver units via systemd directives plus system-wide
kernel.yama.ptrace_scope=2. Keeping the script-sandbox on a separate
uid was the inconsistent half-step — defense-in-depth only, with
build-time-idmap complexity attached. One principle wins: harden
once, share the uid.

scripts/libexec/left4me-script-sandbox: drop the idmap block (uid
lookups, STAGING setup, cleanup_staging trap, mount --bind
--map-users), switch User=/Group= to left4me, point BindPaths at
\$OVERLAY_DIR directly. Header comment updated to reflect
hardening-not-uid as the same-uid defense. nsenter self-wrap kept —
it's about mount-namespace escape, not uid.

Tests + comments + companion docs updated. Build-time-idmap and
overlay-idmap plans marked SUPERSEDED; user-uid-split spec revised
to "1 user is correct"; one-line update notes on the hardening
specs and the build-overlay-unit-design.

Companion ckn-bw commit removes the l4d2-sandbox user + group and
tightens /var/lib/left4me from 0711 → 0755 (the traverse-only mode
was specifically for the sandbox uid).
2026-05-15 15:50:57 +02:00

215 lines
7.3 KiB
Python

from pathlib import Path
import shutil
import subprocess
from typing import Callable
from l4d2host.paths import DEFAULT_LEFT4ME_ROOT, get_left4me_root, overlay_path, validate_instance_name
from l4d2host.service_control import disable_service, enable_service
from l4d2host.spec import load_spec
from l4d2host.logging import emit_step
DEFAULT_ROOT = DEFAULT_LEFT4ME_ROOT
def initialize_instance(
name: str,
spec_path: Path,
*,
root: Path | None = None,
on_stdout: Callable[[str], None] | None = None,
on_stderr: Callable[[str], None] | None = None,
passthrough: bool = False,
should_cancel: Callable[[], bool] | None = None,
) -> None:
name = validate_instance_name(name)
root = get_left4me_root() if root is None else Path(root)
spec = load_spec(spec_path)
emit_step("creating instance directories...", on_stdout, passthrough)
instance_dir = root / "instances" / name
runtime_dir = root / "runtime" / name
(runtime_dir / "upper").mkdir(parents=True, exist_ok=True)
(runtime_dir / "work").mkdir(parents=True, exist_ok=True)
(runtime_dir / "merged").mkdir(parents=True, exist_ok=True)
instance_dir.mkdir(parents=True, exist_ok=True)
lowerdirs = [str(overlay_path(o.path, root=root)) for o in spec.overlays]
lowerdirs.append(str(root / "installation"))
emit_step("writing instance.env...", on_stdout, passthrough)
instance_env = "\n".join(
[
f"L4D2_PORT={spec.port}",
f"L4D2_ARGS={' '.join(spec.arguments)}",
f"L4D2_LOWERDIRS={':'.join(lowerdirs)}",
]
) + "\n"
(instance_dir / "instance.env").write_text(instance_env)
emit_step("writing server.cfg...", on_stdout, passthrough)
server_cfg = "\n".join(spec.config) if spec.config else ""
(instance_dir / "server.cfg").write_text(server_cfg)
emit_step("persisting spec...", on_stdout, passthrough)
shutil.copy2(spec_path, instance_dir / "spec.yaml")
emit_step("initialization complete.", on_stdout, passthrough)
def start_instance(
name: str,
*,
root: Path | None = None,
on_stdout: Callable[[str], None] | None = None,
on_stderr: Callable[[str], None] | None = None,
passthrough: bool = False,
should_cancel: Callable[[], bool] | None = None,
) -> None:
name = validate_instance_name(name)
root = get_left4me_root() if root is None else Path(root)
instance_dir = root / "instances" / name
runtime_dir = root / "runtime" / name
# Stage cfg files in the upper layer. Writing here goes straight to the
# upper dir on the host filesystem; the unit's ExecStartPre then mounts
# the overlay (single source of truth for the mount), and the kernel
# surfaces these files at the top of the merged stack. All overlay
# content (script-built lowers + this upper stage) is left4me-owned
# end-to-end, so the kernel's overlay copy-up is uniform — no
# ownership crossings to reason about.
emit_step("staging server.cfg + per-overlay aliases in upper layer...", on_stdout, passthrough)
upper_cfg_dir = runtime_dir / "upper" / "left4dead2" / "cfg"
upper_cfg_dir.mkdir(parents=True, exist_ok=True)
for stale in upper_cfg_dir.glob("server*.cfg"):
stale.unlink()
shutil.copy2(instance_dir / "server.cfg", upper_cfg_dir / "server.cfg")
spec = load_spec(instance_dir / "spec.yaml")
for o in spec.overlays:
if not o.alias:
continue
src = root / "overlays" / o.path / "left4dead2" / "cfg" / "server.cfg"
if not src.exists():
continue
shutil.copy2(src, upper_cfg_dir / f"server_{o.alias}.cfg")
emit_step("enabling + starting systemd service...", on_stdout, passthrough)
enable_service(
name,
on_stdout=on_stdout,
on_stderr=on_stderr,
passthrough=passthrough,
should_cancel=should_cancel,
)
emit_step("start complete.", on_stdout, passthrough)
def stop_instance(
name: str,
*,
root: Path | None = None,
on_stdout: Callable[[str], None] | None = None,
on_stderr: Callable[[str], None] | None = None,
passthrough: bool = False,
should_cancel: Callable[[], bool] | None = None,
) -> None:
name = validate_instance_name(name)
root = get_left4me_root() if root is None else Path(root)
# `disable --now` triggers the unit's ExecStopPost, which unmounts the
# overlay. Single source of truth for unmount lives in the unit file;
# no Python-side unmount needed.
emit_step("disabling + stopping systemd service...", on_stdout, passthrough)
disable_service(
name,
on_stdout=on_stdout,
on_stderr=on_stderr,
passthrough=passthrough,
should_cancel=should_cancel,
)
emit_step("stop complete.", on_stdout, passthrough)
def _purge_instance(
name: str,
*,
root: Path,
on_stdout: Callable[[str], None] | None,
on_stderr: Callable[[str], None] | None,
passthrough: bool,
should_cancel: Callable[[], bool] | None,
) -> None:
instance_dir = root / "instances" / name
runtime_dir = root / "runtime" / name
# disable --now triggers ExecStopPost which unmounts. The try/except
# tolerates the unit-not-loaded case (e.g., delete on an instance that
# was initialized but never started — no unit, nothing to disable, no
# mount to clean up either).
emit_step("disabling + stopping systemd service (if running)...", on_stdout, passthrough)
try:
disable_service(
name,
on_stdout=on_stdout,
on_stderr=on_stderr,
passthrough=passthrough,
should_cancel=should_cancel,
)
except subprocess.CalledProcessError:
pass
emit_step("removing instance files...", on_stdout, passthrough)
if instance_dir.exists():
shutil.rmtree(instance_dir)
if runtime_dir.exists():
shutil.rmtree(runtime_dir)
def delete_instance(
name: str,
*,
root: Path | None = None,
on_stdout: Callable[[str], None] | None = None,
on_stderr: Callable[[str], None] | None = None,
passthrough: bool = False,
should_cancel: Callable[[], bool] | None = None,
) -> None:
name = validate_instance_name(name)
root = get_left4me_root() if root is None else Path(root)
instance_dir = root / "instances" / name
runtime_dir = root / "runtime" / name
if not instance_dir.exists() and not runtime_dir.exists():
return
_purge_instance(
name,
root=root,
on_stdout=on_stdout,
on_stderr=on_stderr,
passthrough=passthrough,
should_cancel=should_cancel,
)
emit_step("delete complete.", on_stdout, passthrough)
def reset_instance(
name: str,
*,
root: Path | None = None,
on_stdout: Callable[[str], None] | None = None,
on_stderr: Callable[[str], None] | None = None,
passthrough: bool = False,
should_cancel: Callable[[], bool] | None = None,
) -> None:
name = validate_instance_name(name)
root = get_left4me_root() if root is None else Path(root)
_purge_instance(
name,
root=root,
on_stdout=on_stdout,
on_stderr=on_stderr,
passthrough=passthrough,
should_cancel=should_cancel,
)
emit_step("reset complete; next start will reinitialize from blueprint.", on_stdout, passthrough)