left4me/l4d2host/tests/test_overlay_helper.py
mwiegand d5b321b557
feat(l4d2-host): KernelOverlayFSMounter + left4me-overlay helper
New privileged helper at /usr/local/libexec/left4me/left4me-overlay
(Python, system /usr/bin/python3, stdlib only) takes only the instance
name, parses instance.env for L4D2_LOWERDIRS, validates each lowerdir
against an allowlist (installation/, overlays/, global_overlay_cache/,
workshop_cache/), refuses upperdirs tainted with user.fuseoverlayfs.*
xattrs from the prior fuse era, and execs `nsenter --mount=/proc/1/ns/mnt
-- mount -t overlay ...` so the resulting mount lives in the host
namespace. Mirrors the existing left4me-systemctl / left4me-journalctl
pattern; sudoers entry is verb-constrained.

KernelOverlayFSMounter implements the existing OverlayMounter ABC,
deriving the instance name from the merged path. No call sites use it
yet — that's the next commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 12:23:58 +02:00

168 lines
5.8 KiB
Python

import os
import shlex
import subprocess
import sys
from pathlib import Path
import pytest
HELPER_SOURCE = (
Path(__file__).resolve().parents[2]
/ "deploy"
/ "files"
/ "usr"
/ "local"
/ "libexec"
/ "left4me"
/ "left4me-overlay"
)
def _setup_instance(root: Path, name: str = "alpha", lowerdirs: list[str] | None = None) -> None:
"""Create the on-disk shape the helper expects."""
(root / "installation").mkdir(parents=True, exist_ok=True)
(root / "overlays" / "workshop").mkdir(parents=True, exist_ok=True)
if lowerdirs is None:
lowerdirs = [str(root / "overlays" / "workshop"), str(root / "installation")]
inst_dir = root / "instances" / name
inst_dir.mkdir(parents=True)
(inst_dir / "instance.env").write_text(
f"L4D2_PORT=27015\nL4D2_ARGS=-tickrate 100\nL4D2_LOWERDIRS={':'.join(lowerdirs)}\n"
)
runtime = root / "runtime" / name
(runtime / "upper").mkdir(parents=True)
(runtime / "work").mkdir(parents=True)
(runtime / "merged").mkdir(parents=True)
def _run(args: list[str], root: Path, extra_env: dict[str, str] | None = None) -> subprocess.CompletedProcess:
env = {
**os.environ,
"LEFT4ME_ROOT": str(root),
"LEFT4ME_OVERLAY_PRINT_ONLY": "1",
}
if extra_env:
env.update(extra_env)
return subprocess.run(
[sys.executable, str(HELPER_SOURCE), *args],
env=env,
capture_output=True,
text=True,
)
def test_mount_prints_expected_nsenter_command(tmp_path: Path) -> None:
_setup_instance(tmp_path)
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode == 0, result.stderr
parts = shlex.split(result.stdout.strip())
assert parts[0] == "/usr/bin/nsenter"
assert parts[1] == "--mount=/proc/1/ns/mnt"
assert parts[2] == "--"
assert parts[3] == "/bin/mount"
assert parts[4:6] == ["-t", "overlay"]
assert parts[6] == "overlay"
assert parts[7] == "-o"
options = parts[8]
assert f"upperdir={tmp_path}/runtime/alpha/upper" in options
assert f"workdir={tmp_path}/runtime/alpha/work" in options
assert f"lowerdir={tmp_path}/overlays/workshop:{tmp_path}/installation" in options
assert parts[9] == str(tmp_path / "runtime" / "alpha" / "merged")
def test_umount_prints_expected_nsenter_command(tmp_path: Path) -> None:
_setup_instance(tmp_path)
result = _run(["umount", "alpha"], tmp_path)
assert result.returncode == 0, result.stderr
parts = shlex.split(result.stdout.strip())
assert parts == [
"/usr/bin/nsenter",
"--mount=/proc/1/ns/mnt",
"--",
"/bin/umount",
str(tmp_path / "runtime" / "alpha" / "merged"),
]
@pytest.mark.parametrize("bad_name", ["..", "../escape", "FOO", "foo bar", "foo/bar", ""])
def test_rejects_bad_instance_name(tmp_path: Path, bad_name: str) -> None:
result = _run(["mount", bad_name], tmp_path)
assert result.returncode != 0
assert "invalid instance name" in result.stderr or "usage:" in result.stderr
def test_rejects_lowerdir_outside_allowlist(tmp_path: Path) -> None:
_setup_instance(tmp_path, lowerdirs=["/etc"])
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode != 0
assert "outside the permitted roots" in result.stderr
def test_rejects_lowerdir_traversal(tmp_path: Path) -> None:
# An overlay subdirectory whose path uses .. to escape the overlays root.
_setup_instance(tmp_path, lowerdirs=[str(tmp_path / "overlays" / "..") + "/etc"])
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode != 0
assert "outside the permitted roots" in result.stderr or "path does not exist" in result.stderr
def test_rejects_lowerdir_symlink_escape(tmp_path: Path) -> None:
_setup_instance(tmp_path)
sneaky = tmp_path / "overlays" / "sneaky"
os.symlink("/etc", sneaky)
# rewrite instance.env to point at the symlink
inst_env = tmp_path / "instances" / "alpha" / "instance.env"
inst_env.write_text(f"L4D2_LOWERDIRS={sneaky}\n")
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode != 0
assert "outside the permitted roots" in result.stderr
def test_rejects_missing_instance_env(tmp_path: Path) -> None:
(tmp_path / "instances" / "alpha").mkdir(parents=True)
runtime = tmp_path / "runtime" / "alpha"
(runtime / "upper").mkdir(parents=True)
(runtime / "work").mkdir(parents=True)
(runtime / "merged").mkdir(parents=True)
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode != 0
assert "instance.env not found" in result.stderr
def test_rejects_lowerdir_count_over_cap(tmp_path: Path) -> None:
(tmp_path / "installation").mkdir()
many = [str(tmp_path / "installation")] * 501
_setup_instance(tmp_path, lowerdirs=many)
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode != 0
assert "501 entries" in result.stderr
def test_rejects_empty_lowerdir_entry(tmp_path: Path) -> None:
(tmp_path / "installation").mkdir()
_setup_instance(
tmp_path,
lowerdirs=[str(tmp_path / "installation"), "", str(tmp_path / "installation")],
)
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode != 0
assert "empty entry" in result.stderr
@pytest.mark.skipif(sys.platform != "linux", reason="user.* xattrs are Linux-only")
def test_rejects_upperdir_with_fuseoverlayfs_xattr(tmp_path: Path) -> None:
_setup_instance(tmp_path)
tainted = tmp_path / "runtime" / "alpha" / "upper" / "deleted-thing"
tainted.write_bytes(b"")
try:
os.setxattr(tainted, "user.fuseoverlayfs.opaque", b"y")
except OSError:
pytest.skip("filesystem doesn't support user.* xattrs")
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode != 0
assert "fuse-overlayfs xattr" in result.stderr