Each linked overlay gets a checkbox on the blueprint detail page that opts
its server.cfg in as exec server_overlay_<id>. The web app builds the
spec with {path, alias} per overlay and prepends exec server_overlay_<id>
lines to the blueprint config in lowest-overlay-first order. The host
stages those copies in the overlayfs upper layer before mounting (avoids
copy-up writes against a sandbox-uid file). A live preview block above the
Config textarea shows what gets auto-executed.
Schema:
- alembic 0007: BlueprintOverlay.expose_server_cfg BOOLEAN
Spec contract:
- l4d2host OverlayRef(path, alias?). load_spec accepts both bare-string
and {path, alias} entries.
Side effects folded in (same file in l4d2_facade):
- start_server auto-initializes; the manual Initialize step is no longer
needed before Start.
- initialize_server no longer runs blueprint builders — builds happen on
overlay save, not on every server Start.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
270 lines
8.7 KiB
Python
270 lines
8.7 KiB
Python
import os
|
|
from pathlib import Path
|
|
import shutil
|
|
import subprocess
|
|
from typing import Callable
|
|
|
|
from l4d2host.fs.kernel_overlayfs import KernelOverlayFSMounter
|
|
from l4d2host.paths import DEFAULT_LEFT4ME_ROOT, get_left4me_root, overlay_path, validate_instance_name
|
|
from l4d2host.service_control import start_service, stop_service
|
|
from l4d2host.spec import load_spec
|
|
|
|
|
|
from l4d2host.logging import emit_step
|
|
|
|
|
|
_mounter = KernelOverlayFSMounter()
|
|
|
|
|
|
DEFAULT_ROOT = DEFAULT_LEFT4ME_ROOT
|
|
|
|
|
|
def initialize_instance(
|
|
name: str,
|
|
spec_path: Path,
|
|
*,
|
|
root: Path | None = None,
|
|
on_stdout: Callable[[str], None] | None = None,
|
|
on_stderr: Callable[[str], None] | None = None,
|
|
passthrough: bool = False,
|
|
should_cancel: Callable[[], bool] | None = None,
|
|
) -> None:
|
|
name = validate_instance_name(name)
|
|
root = get_left4me_root() if root is None else Path(root)
|
|
spec = load_spec(spec_path)
|
|
|
|
emit_step("creating instance directories...", on_stdout, passthrough)
|
|
instance_dir = root / "instances" / name
|
|
runtime_dir = root / "runtime" / name
|
|
(runtime_dir / "upper").mkdir(parents=True, exist_ok=True)
|
|
(runtime_dir / "work").mkdir(parents=True, exist_ok=True)
|
|
(runtime_dir / "merged").mkdir(parents=True, exist_ok=True)
|
|
instance_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
lowerdirs = [str(overlay_path(o.path, root=root)) for o in spec.overlays]
|
|
lowerdirs.append(str(root / "installation"))
|
|
|
|
emit_step("writing instance.env...", on_stdout, passthrough)
|
|
instance_env = "\n".join(
|
|
[
|
|
f"L4D2_PORT={spec.port}",
|
|
f"L4D2_ARGS={' '.join(spec.arguments)}",
|
|
f"L4D2_LOWERDIRS={':'.join(lowerdirs)}",
|
|
]
|
|
) + "\n"
|
|
(instance_dir / "instance.env").write_text(instance_env)
|
|
|
|
emit_step("writing server.cfg...", on_stdout, passthrough)
|
|
server_cfg = "\n".join(spec.config) if spec.config else ""
|
|
(instance_dir / "server.cfg").write_text(server_cfg)
|
|
|
|
emit_step("persisting spec...", on_stdout, passthrough)
|
|
shutil.copy2(spec_path, instance_dir / "spec.yaml")
|
|
emit_step("initialization complete.", on_stdout, passthrough)
|
|
|
|
|
|
def _load_instance_env(path: Path) -> dict[str, str]:
|
|
result: dict[str, str] = {}
|
|
for line in path.read_text().splitlines():
|
|
if "=" not in line:
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
result[key] = value
|
|
return result
|
|
|
|
|
|
def start_instance(
|
|
name: str,
|
|
*,
|
|
root: Path | None = None,
|
|
on_stdout: Callable[[str], None] | None = None,
|
|
on_stderr: Callable[[str], None] | None = None,
|
|
passthrough: bool = False,
|
|
should_cancel: Callable[[], bool] | None = None,
|
|
) -> None:
|
|
name = validate_instance_name(name)
|
|
root = get_left4me_root() if root is None else Path(root)
|
|
instance_dir = root / "instances" / name
|
|
runtime_dir = root / "runtime" / name
|
|
|
|
env = _load_instance_env(instance_dir / "instance.env")
|
|
|
|
merged = runtime_dir / "merged"
|
|
if os.path.ismount(merged):
|
|
# Kernel overlayfs mounts persist when the web worker dies (unlike
|
|
# fuse daemons, which were reaped with their cgroup). Refuse rather
|
|
# than double-mount.
|
|
raise subprocess.CalledProcessError(
|
|
returncode=1,
|
|
cmd=["start_instance"],
|
|
stderr=f"runtime overlay already mounted at {merged}; refusing to double-mount",
|
|
)
|
|
|
|
# Stage cfg files in the upper layer BEFORE mounting. Writing through
|
|
# merged after the mount triggers overlayfs copy-up, which preserves the
|
|
# lower file's ownership — and a script-sandbox-built `server.cfg` is
|
|
# owned by `l4d2-sandbox`, not the worker. Pre-mount writes go straight to
|
|
# upper with the worker's uid; the kernel just shows them at the top of
|
|
# the merged stack once mounted.
|
|
emit_step("staging server.cfg + per-overlay aliases in upper layer...", on_stdout, passthrough)
|
|
upper_cfg_dir = runtime_dir / "upper" / "left4dead2" / "cfg"
|
|
upper_cfg_dir.mkdir(parents=True, exist_ok=True)
|
|
for stale in upper_cfg_dir.glob("server*.cfg"):
|
|
stale.unlink()
|
|
shutil.copy2(instance_dir / "server.cfg", upper_cfg_dir / "server.cfg")
|
|
spec = load_spec(instance_dir / "spec.yaml")
|
|
for o in spec.overlays:
|
|
if not o.alias:
|
|
continue
|
|
src = root / "overlays" / o.path / "left4dead2" / "cfg" / "server.cfg"
|
|
if not src.exists():
|
|
continue
|
|
shutil.copy2(src, upper_cfg_dir / f"server_{o.alias}.cfg")
|
|
|
|
emit_step("mounting runtime overlay...", on_stdout, passthrough)
|
|
_mounter.mount(
|
|
lowerdirs=env["L4D2_LOWERDIRS"],
|
|
upperdir=runtime_dir / "upper",
|
|
workdir=runtime_dir / "work",
|
|
merged=merged,
|
|
on_stdout=on_stdout,
|
|
on_stderr=on_stderr,
|
|
passthrough=passthrough,
|
|
should_cancel=should_cancel,
|
|
)
|
|
|
|
emit_step("starting systemd service...", on_stdout, passthrough)
|
|
start_service(
|
|
name,
|
|
on_stdout=on_stdout,
|
|
on_stderr=on_stderr,
|
|
passthrough=passthrough,
|
|
should_cancel=should_cancel,
|
|
)
|
|
emit_step("start complete.", on_stdout, passthrough)
|
|
|
|
|
|
def stop_instance(
|
|
name: str,
|
|
*,
|
|
root: Path | None = None,
|
|
on_stdout: Callable[[str], None] | None = None,
|
|
on_stderr: Callable[[str], None] | None = None,
|
|
passthrough: bool = False,
|
|
should_cancel: Callable[[], bool] | None = None,
|
|
) -> None:
|
|
name = validate_instance_name(name)
|
|
root = get_left4me_root() if root is None else Path(root)
|
|
emit_step("stopping systemd service...", on_stdout, passthrough)
|
|
stop_service(
|
|
name,
|
|
on_stdout=on_stdout,
|
|
on_stderr=on_stderr,
|
|
passthrough=passthrough,
|
|
should_cancel=should_cancel,
|
|
)
|
|
emit_step("unmounting runtime overlay (if mounted)...", on_stdout, passthrough)
|
|
try:
|
|
_mounter.unmount(
|
|
merged=root / "runtime" / name / "merged",
|
|
on_stdout=on_stdout,
|
|
on_stderr=on_stderr,
|
|
passthrough=passthrough,
|
|
should_cancel=should_cancel,
|
|
)
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
emit_step("stop complete.", on_stdout, passthrough)
|
|
|
|
|
|
def _purge_instance(
|
|
name: str,
|
|
*,
|
|
root: Path,
|
|
on_stdout: Callable[[str], None] | None,
|
|
on_stderr: Callable[[str], None] | None,
|
|
passthrough: bool,
|
|
should_cancel: Callable[[], bool] | None,
|
|
) -> None:
|
|
instance_dir = root / "instances" / name
|
|
runtime_dir = root / "runtime" / name
|
|
|
|
emit_step("stopping systemd service (if running)...", on_stdout, passthrough)
|
|
try:
|
|
stop_service(
|
|
name,
|
|
on_stdout=on_stdout,
|
|
on_stderr=on_stderr,
|
|
passthrough=passthrough,
|
|
should_cancel=should_cancel,
|
|
)
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
|
|
emit_step("unmounting runtime overlay (if mounted)...", on_stdout, passthrough)
|
|
try:
|
|
_mounter.unmount(
|
|
merged=runtime_dir / "merged",
|
|
on_stdout=on_stdout,
|
|
on_stderr=on_stderr,
|
|
passthrough=passthrough,
|
|
should_cancel=should_cancel,
|
|
)
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
|
|
emit_step("removing instance files...", on_stdout, passthrough)
|
|
if instance_dir.exists():
|
|
shutil.rmtree(instance_dir)
|
|
if runtime_dir.exists():
|
|
shutil.rmtree(runtime_dir)
|
|
|
|
|
|
def delete_instance(
|
|
name: str,
|
|
*,
|
|
root: Path | None = None,
|
|
on_stdout: Callable[[str], None] | None = None,
|
|
on_stderr: Callable[[str], None] | None = None,
|
|
passthrough: bool = False,
|
|
should_cancel: Callable[[], bool] | None = None,
|
|
) -> None:
|
|
name = validate_instance_name(name)
|
|
root = get_left4me_root() if root is None else Path(root)
|
|
instance_dir = root / "instances" / name
|
|
runtime_dir = root / "runtime" / name
|
|
|
|
if not instance_dir.exists() and not runtime_dir.exists():
|
|
return
|
|
|
|
_purge_instance(
|
|
name,
|
|
root=root,
|
|
on_stdout=on_stdout,
|
|
on_stderr=on_stderr,
|
|
passthrough=passthrough,
|
|
should_cancel=should_cancel,
|
|
)
|
|
emit_step("delete complete.", on_stdout, passthrough)
|
|
|
|
|
|
def reset_instance(
|
|
name: str,
|
|
*,
|
|
root: Path | None = None,
|
|
on_stdout: Callable[[str], None] | None = None,
|
|
on_stderr: Callable[[str], None] | None = None,
|
|
passthrough: bool = False,
|
|
should_cancel: Callable[[], bool] | None = None,
|
|
) -> None:
|
|
name = validate_instance_name(name)
|
|
root = get_left4me_root() if root is None else Path(root)
|
|
_purge_instance(
|
|
name,
|
|
root=root,
|
|
on_stdout=on_stdout,
|
|
on_stderr=on_stderr,
|
|
passthrough=passthrough,
|
|
should_cancel=should_cancel,
|
|
)
|
|
emit_step("reset complete; next start will reinitialize from blueprint.", on_stdout, passthrough)
|