from pathlib import Path import shutil import subprocess from typing import Callable from l4d2host.fs.kernel_overlayfs import KernelOverlayFSMounter from l4d2host.paths import DEFAULT_LEFT4ME_ROOT, get_left4me_root, overlay_path, validate_instance_name from l4d2host.service_control import disable_service, enable_service from l4d2host.spec import load_spec from l4d2host.logging import emit_step _mounter = KernelOverlayFSMounter() DEFAULT_ROOT = DEFAULT_LEFT4ME_ROOT def initialize_instance( name: str, spec_path: Path, *, root: Path | None = None, on_stdout: Callable[[str], None] | None = None, on_stderr: Callable[[str], None] | None = None, passthrough: bool = False, should_cancel: Callable[[], bool] | None = None, ) -> None: name = validate_instance_name(name) root = get_left4me_root() if root is None else Path(root) spec = load_spec(spec_path) emit_step("creating instance directories...", on_stdout, passthrough) instance_dir = root / "instances" / name runtime_dir = root / "runtime" / name (runtime_dir / "upper").mkdir(parents=True, exist_ok=True) (runtime_dir / "work").mkdir(parents=True, exist_ok=True) (runtime_dir / "merged").mkdir(parents=True, exist_ok=True) instance_dir.mkdir(parents=True, exist_ok=True) lowerdirs = [str(overlay_path(o.path, root=root)) for o in spec.overlays] lowerdirs.append(str(root / "installation")) emit_step("writing instance.env...", on_stdout, passthrough) instance_env = "\n".join( [ f"L4D2_PORT={spec.port}", f"L4D2_ARGS={' '.join(spec.arguments)}", f"L4D2_LOWERDIRS={':'.join(lowerdirs)}", ] ) + "\n" (instance_dir / "instance.env").write_text(instance_env) emit_step("writing server.cfg...", on_stdout, passthrough) server_cfg = "\n".join(spec.config) if spec.config else "" (instance_dir / "server.cfg").write_text(server_cfg) emit_step("persisting spec...", on_stdout, passthrough) shutil.copy2(spec_path, instance_dir / "spec.yaml") emit_step("initialization complete.", on_stdout, passthrough) def start_instance( name: str, *, root: Path | None = None, on_stdout: Callable[[str], None] | None = None, on_stderr: Callable[[str], None] | None = None, passthrough: bool = False, should_cancel: Callable[[], bool] | None = None, ) -> None: name = validate_instance_name(name) root = get_left4me_root() if root is None else Path(root) instance_dir = root / "instances" / name runtime_dir = root / "runtime" / name # Stage cfg files in the upper layer. Writing here goes straight to the # upper dir on the host filesystem with the worker's uid; the unit's # ExecStartPre then mounts the overlay (single source of truth for the # mount), and the kernel surfaces these files at the top of the merged # stack. A script-sandbox-built lower-layer `server.cfg` is owned by # `l4d2-sandbox`, not the worker — staging in upper sidesteps the # ownership-preserving copy-up that would happen if we wrote through # merged post-mount. emit_step("staging server.cfg + per-overlay aliases in upper layer...", on_stdout, passthrough) upper_cfg_dir = runtime_dir / "upper" / "left4dead2" / "cfg" upper_cfg_dir.mkdir(parents=True, exist_ok=True) for stale in upper_cfg_dir.glob("server*.cfg"): stale.unlink() shutil.copy2(instance_dir / "server.cfg", upper_cfg_dir / "server.cfg") spec = load_spec(instance_dir / "spec.yaml") for o in spec.overlays: if not o.alias: continue src = root / "overlays" / o.path / "left4dead2" / "cfg" / "server.cfg" if not src.exists(): continue shutil.copy2(src, upper_cfg_dir / f"server_{o.alias}.cfg") emit_step("enabling + starting systemd service...", on_stdout, passthrough) enable_service( name, on_stdout=on_stdout, on_stderr=on_stderr, passthrough=passthrough, should_cancel=should_cancel, ) emit_step("start complete.", on_stdout, passthrough) def stop_instance( name: str, *, root: Path | None = None, on_stdout: Callable[[str], None] | None = None, on_stderr: Callable[[str], None] | None = None, passthrough: bool = False, should_cancel: Callable[[], bool] | None = None, ) -> None: name = validate_instance_name(name) root = get_left4me_root() if root is None else Path(root) emit_step("disabling + stopping systemd service...", on_stdout, passthrough) disable_service( name, on_stdout=on_stdout, on_stderr=on_stderr, passthrough=passthrough, should_cancel=should_cancel, ) emit_step("unmounting runtime overlay (if mounted)...", on_stdout, passthrough) try: _mounter.unmount( merged=root / "runtime" / name / "merged", on_stdout=on_stdout, on_stderr=on_stderr, passthrough=passthrough, should_cancel=should_cancel, ) except subprocess.CalledProcessError: pass emit_step("stop complete.", on_stdout, passthrough) def _purge_instance( name: str, *, root: Path, on_stdout: Callable[[str], None] | None, on_stderr: Callable[[str], None] | None, passthrough: bool, should_cancel: Callable[[], bool] | None, ) -> None: instance_dir = root / "instances" / name runtime_dir = root / "runtime" / name emit_step("disabling + stopping systemd service (if running)...", on_stdout, passthrough) try: disable_service( name, on_stdout=on_stdout, on_stderr=on_stderr, passthrough=passthrough, should_cancel=should_cancel, ) except subprocess.CalledProcessError: pass emit_step("unmounting runtime overlay (if mounted)...", on_stdout, passthrough) try: _mounter.unmount( merged=runtime_dir / "merged", on_stdout=on_stdout, on_stderr=on_stderr, passthrough=passthrough, should_cancel=should_cancel, ) except subprocess.CalledProcessError: pass emit_step("removing instance files...", on_stdout, passthrough) if instance_dir.exists(): shutil.rmtree(instance_dir) if runtime_dir.exists(): shutil.rmtree(runtime_dir) def delete_instance( name: str, *, root: Path | None = None, on_stdout: Callable[[str], None] | None = None, on_stderr: Callable[[str], None] | None = None, passthrough: bool = False, should_cancel: Callable[[], bool] | None = None, ) -> None: name = validate_instance_name(name) root = get_left4me_root() if root is None else Path(root) instance_dir = root / "instances" / name runtime_dir = root / "runtime" / name if not instance_dir.exists() and not runtime_dir.exists(): return _purge_instance( name, root=root, on_stdout=on_stdout, on_stderr=on_stderr, passthrough=passthrough, should_cancel=should_cancel, ) emit_step("delete complete.", on_stdout, passthrough) def reset_instance( name: str, *, root: Path | None = None, on_stdout: Callable[[str], None] | None = None, on_stderr: Callable[[str], None] | None = None, passthrough: bool = False, should_cancel: Callable[[], bool] | None = None, ) -> None: name = validate_instance_name(name) root = get_left4me_root() if root is None else Path(root) _purge_instance( name, root=root, on_stdout=on_stdout, on_stderr=on_stderr, passthrough=passthrough, should_cancel=should_cancel, ) emit_step("reset complete; next start will reinitialize from blueprint.", on_stdout, passthrough)