left4me/l4d2host/instances.py
mwiegand f81e839ba2
security: harden boundary inputs and production defaults
- validate instance names at the host lib and web boundary against
  [a-z0-9][a-z0-9_-]{0,63} to prevent path traversal via Server.name
- fail-closed on SECRET_KEY: load_config returns None when env unset,
  create_app raises if missing or "dev" outside TESTING
- close login timing oracle by hashing a dummy digest when the user
  is not found, equalizing response time
- set SESSION_COOKIE_SECURE outside TESTING
- delete_instance tolerates stop_service and fusermount3 failures so
  partially-initialized instances clean up without contract breaks;
  drops the is_mount() preflight that violated AGENTS.md
- document claim_next_job's single-process assumption
- clarify emit_step contract via docstring

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 00:53:33 +02:00

195 lines
6.2 KiB
Python

from pathlib import Path
import shutil
import subprocess
from typing import Callable
from l4d2host.paths import DEFAULT_LEFT4ME_ROOT, get_left4me_root, overlay_path, validate_instance_name
from l4d2host.process import run_command
from l4d2host.service_control import start_service, stop_service
from l4d2host.spec import load_spec
from l4d2host.logging import emit_step
DEFAULT_ROOT = DEFAULT_LEFT4ME_ROOT
def initialize_instance(
name: str,
spec_path: Path,
*,
root: Path | None = None,
on_stdout: Callable[[str], None] | None = None,
on_stderr: Callable[[str], None] | None = None,
passthrough: bool = False,
should_cancel: Callable[[], bool] | None = None,
) -> None:
name = validate_instance_name(name)
root = get_left4me_root() if root is None else Path(root)
spec = load_spec(spec_path)
emit_step("creating instance directories...", on_stdout, passthrough)
instance_dir = root / "instances" / name
runtime_dir = root / "runtime" / name
(runtime_dir / "upper").mkdir(parents=True, exist_ok=True)
(runtime_dir / "work").mkdir(parents=True, exist_ok=True)
(runtime_dir / "merged").mkdir(parents=True, exist_ok=True)
instance_dir.mkdir(parents=True, exist_ok=True)
lowerdirs = [str(overlay_path(overlay, root=root)) for overlay in spec.overlays]
lowerdirs.append(str(root / "installation"))
emit_step("writing instance.env...", on_stdout, passthrough)
instance_env = "\n".join(
[
f"L4D2_PORT={spec.port}",
f"L4D2_ARGS={' '.join(spec.arguments)}",
f"L4D2_LOWERDIRS={':'.join(lowerdirs)}",
]
) + "\n"
(instance_dir / "instance.env").write_text(instance_env)
emit_step("writing server.cfg...", on_stdout, passthrough)
server_cfg = "\n".join(spec.config) if spec.config else ""
(instance_dir / "server.cfg").write_text(server_cfg)
emit_step("initialization complete.", on_stdout, passthrough)
def _load_instance_env(path: Path) -> dict[str, str]:
result: dict[str, str] = {}
for line in path.read_text().splitlines():
if "=" not in line:
continue
key, value = line.split("=", 1)
result[key] = value
return result
def start_instance(
name: str,
*,
root: Path | None = None,
on_stdout: Callable[[str], None] | None = None,
on_stderr: Callable[[str], None] | None = None,
passthrough: bool = False,
should_cancel: Callable[[], bool] | None = None,
) -> None:
name = validate_instance_name(name)
root = get_left4me_root() if root is None else Path(root)
instance_dir = root / "instances" / name
runtime_dir = root / "runtime" / name
env = _load_instance_env(instance_dir / "instance.env")
emit_step("mounting runtime overlay...", on_stdout, passthrough)
run_command(
[
"fuse-overlayfs",
"-o",
(
f"lowerdir={env['L4D2_LOWERDIRS']},"
f"upperdir={runtime_dir / 'upper'},"
f"workdir={runtime_dir / 'work'}"
),
str(runtime_dir / "merged"),
],
on_stdout=on_stdout,
on_stderr=on_stderr,
passthrough=passthrough,
should_cancel=should_cancel,
)
emit_step("copying server.cfg to runtime...", on_stdout, passthrough)
target_cfg = runtime_dir / "merged" / "left4dead2" / "cfg" / "server.cfg"
target_cfg.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(instance_dir / "server.cfg", target_cfg)
emit_step("starting systemd service...", on_stdout, passthrough)
start_service(
name,
on_stdout=on_stdout,
on_stderr=on_stderr,
passthrough=passthrough,
should_cancel=should_cancel,
)
emit_step("start complete.", on_stdout, passthrough)
def stop_instance(
name: str,
*,
root: Path | None = None,
on_stdout: Callable[[str], None] | None = None,
on_stderr: Callable[[str], None] | None = None,
passthrough: bool = False,
should_cancel: Callable[[], bool] | None = None,
) -> None:
name = validate_instance_name(name)
root = get_left4me_root() if root is None else Path(root)
emit_step("stopping systemd service...", on_stdout, passthrough)
stop_service(
name,
on_stdout=on_stdout,
on_stderr=on_stderr,
passthrough=passthrough,
should_cancel=should_cancel,
)
emit_step("unmounting runtime overlay...", on_stdout, passthrough)
run_command(
["fusermount3", "-u", str(root / "runtime" / name / "merged")],
on_stdout=on_stdout,
on_stderr=on_stderr,
passthrough=passthrough,
should_cancel=should_cancel,
)
emit_step("stop complete.", on_stdout, passthrough)
def delete_instance(
name: str,
*,
root: Path | None = None,
on_stdout: Callable[[str], None] | None = None,
on_stderr: Callable[[str], None] | None = None,
passthrough: bool = False,
should_cancel: Callable[[], bool] | None = None,
) -> None:
name = validate_instance_name(name)
root = get_left4me_root() if root is None else Path(root)
instance_dir = root / "instances" / name
runtime_dir = root / "runtime" / name
if not instance_dir.exists() and not runtime_dir.exists():
return
emit_step("stopping systemd service (if running)...", on_stdout, passthrough)
try:
stop_service(
name,
on_stdout=on_stdout,
on_stderr=on_stderr,
passthrough=passthrough,
should_cancel=should_cancel,
)
except subprocess.CalledProcessError:
pass
emit_step("unmounting runtime overlay (if mounted)...", on_stdout, passthrough)
try:
run_command(
["fusermount3", "-u", str(runtime_dir / "merged")],
on_stdout=on_stdout,
on_stderr=on_stderr,
passthrough=passthrough,
should_cancel=should_cancel,
)
except subprocess.CalledProcessError:
pass
emit_step("removing instance files...", on_stdout, passthrough)
if instance_dir.exists():
shutil.rmtree(instance_dir)
if runtime_dir.exists():
shutil.rmtree(runtime_dir)
emit_step("delete complete.", on_stdout, passthrough)