New privileged helper at /usr/local/libexec/left4me/left4me-overlay (Python, system /usr/bin/python3, stdlib only) takes only the instance name, parses instance.env for L4D2_LOWERDIRS, validates each lowerdir against an allowlist (installation/, overlays/, global_overlay_cache/, workshop_cache/), refuses upperdirs tainted with user.fuseoverlayfs.* xattrs from the prior fuse era, and execs `nsenter --mount=/proc/1/ns/mnt -- mount -t overlay ...` so the resulting mount lives in the host namespace. Mirrors the existing left4me-systemctl / left4me-journalctl pattern; sudoers entry is verb-constrained. KernelOverlayFSMounter implements the existing OverlayMounter ABC, deriving the instance name from the merged path. No call sites use it yet — that's the next commit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
168 lines
5.8 KiB
Python
168 lines
5.8 KiB
Python
import os
|
|
import shlex
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
|
|
HELPER_SOURCE = (
|
|
Path(__file__).resolve().parents[2]
|
|
/ "deploy"
|
|
/ "files"
|
|
/ "usr"
|
|
/ "local"
|
|
/ "libexec"
|
|
/ "left4me"
|
|
/ "left4me-overlay"
|
|
)
|
|
|
|
|
|
def _setup_instance(root: Path, name: str = "alpha", lowerdirs: list[str] | None = None) -> None:
|
|
"""Create the on-disk shape the helper expects."""
|
|
(root / "installation").mkdir(parents=True, exist_ok=True)
|
|
(root / "overlays" / "workshop").mkdir(parents=True, exist_ok=True)
|
|
if lowerdirs is None:
|
|
lowerdirs = [str(root / "overlays" / "workshop"), str(root / "installation")]
|
|
inst_dir = root / "instances" / name
|
|
inst_dir.mkdir(parents=True)
|
|
(inst_dir / "instance.env").write_text(
|
|
f"L4D2_PORT=27015\nL4D2_ARGS=-tickrate 100\nL4D2_LOWERDIRS={':'.join(lowerdirs)}\n"
|
|
)
|
|
runtime = root / "runtime" / name
|
|
(runtime / "upper").mkdir(parents=True)
|
|
(runtime / "work").mkdir(parents=True)
|
|
(runtime / "merged").mkdir(parents=True)
|
|
|
|
|
|
def _run(args: list[str], root: Path, extra_env: dict[str, str] | None = None) -> subprocess.CompletedProcess:
|
|
env = {
|
|
**os.environ,
|
|
"LEFT4ME_ROOT": str(root),
|
|
"LEFT4ME_OVERLAY_PRINT_ONLY": "1",
|
|
}
|
|
if extra_env:
|
|
env.update(extra_env)
|
|
return subprocess.run(
|
|
[sys.executable, str(HELPER_SOURCE), *args],
|
|
env=env,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
|
|
|
|
def test_mount_prints_expected_nsenter_command(tmp_path: Path) -> None:
|
|
_setup_instance(tmp_path)
|
|
|
|
result = _run(["mount", "alpha"], tmp_path)
|
|
|
|
assert result.returncode == 0, result.stderr
|
|
parts = shlex.split(result.stdout.strip())
|
|
assert parts[0] == "/usr/bin/nsenter"
|
|
assert parts[1] == "--mount=/proc/1/ns/mnt"
|
|
assert parts[2] == "--"
|
|
assert parts[3] == "/bin/mount"
|
|
assert parts[4:6] == ["-t", "overlay"]
|
|
assert parts[6] == "overlay"
|
|
assert parts[7] == "-o"
|
|
options = parts[8]
|
|
assert f"upperdir={tmp_path}/runtime/alpha/upper" in options
|
|
assert f"workdir={tmp_path}/runtime/alpha/work" in options
|
|
assert f"lowerdir={tmp_path}/overlays/workshop:{tmp_path}/installation" in options
|
|
assert parts[9] == str(tmp_path / "runtime" / "alpha" / "merged")
|
|
|
|
|
|
def test_umount_prints_expected_nsenter_command(tmp_path: Path) -> None:
|
|
_setup_instance(tmp_path)
|
|
|
|
result = _run(["umount", "alpha"], tmp_path)
|
|
|
|
assert result.returncode == 0, result.stderr
|
|
parts = shlex.split(result.stdout.strip())
|
|
assert parts == [
|
|
"/usr/bin/nsenter",
|
|
"--mount=/proc/1/ns/mnt",
|
|
"--",
|
|
"/bin/umount",
|
|
str(tmp_path / "runtime" / "alpha" / "merged"),
|
|
]
|
|
|
|
|
|
@pytest.mark.parametrize("bad_name", ["..", "../escape", "FOO", "foo bar", "foo/bar", ""])
|
|
def test_rejects_bad_instance_name(tmp_path: Path, bad_name: str) -> None:
|
|
result = _run(["mount", bad_name], tmp_path)
|
|
assert result.returncode != 0
|
|
assert "invalid instance name" in result.stderr or "usage:" in result.stderr
|
|
|
|
|
|
def test_rejects_lowerdir_outside_allowlist(tmp_path: Path) -> None:
|
|
_setup_instance(tmp_path, lowerdirs=["/etc"])
|
|
result = _run(["mount", "alpha"], tmp_path)
|
|
assert result.returncode != 0
|
|
assert "outside the permitted roots" in result.stderr
|
|
|
|
|
|
def test_rejects_lowerdir_traversal(tmp_path: Path) -> None:
|
|
# An overlay subdirectory whose path uses .. to escape the overlays root.
|
|
_setup_instance(tmp_path, lowerdirs=[str(tmp_path / "overlays" / "..") + "/etc"])
|
|
result = _run(["mount", "alpha"], tmp_path)
|
|
assert result.returncode != 0
|
|
assert "outside the permitted roots" in result.stderr or "path does not exist" in result.stderr
|
|
|
|
|
|
def test_rejects_lowerdir_symlink_escape(tmp_path: Path) -> None:
|
|
_setup_instance(tmp_path)
|
|
sneaky = tmp_path / "overlays" / "sneaky"
|
|
os.symlink("/etc", sneaky)
|
|
# rewrite instance.env to point at the symlink
|
|
inst_env = tmp_path / "instances" / "alpha" / "instance.env"
|
|
inst_env.write_text(f"L4D2_LOWERDIRS={sneaky}\n")
|
|
result = _run(["mount", "alpha"], tmp_path)
|
|
assert result.returncode != 0
|
|
assert "outside the permitted roots" in result.stderr
|
|
|
|
|
|
def test_rejects_missing_instance_env(tmp_path: Path) -> None:
|
|
(tmp_path / "instances" / "alpha").mkdir(parents=True)
|
|
runtime = tmp_path / "runtime" / "alpha"
|
|
(runtime / "upper").mkdir(parents=True)
|
|
(runtime / "work").mkdir(parents=True)
|
|
(runtime / "merged").mkdir(parents=True)
|
|
result = _run(["mount", "alpha"], tmp_path)
|
|
assert result.returncode != 0
|
|
assert "instance.env not found" in result.stderr
|
|
|
|
|
|
def test_rejects_lowerdir_count_over_cap(tmp_path: Path) -> None:
|
|
(tmp_path / "installation").mkdir()
|
|
many = [str(tmp_path / "installation")] * 501
|
|
_setup_instance(tmp_path, lowerdirs=many)
|
|
result = _run(["mount", "alpha"], tmp_path)
|
|
assert result.returncode != 0
|
|
assert "501 entries" in result.stderr
|
|
|
|
|
|
def test_rejects_empty_lowerdir_entry(tmp_path: Path) -> None:
|
|
(tmp_path / "installation").mkdir()
|
|
_setup_instance(
|
|
tmp_path,
|
|
lowerdirs=[str(tmp_path / "installation"), "", str(tmp_path / "installation")],
|
|
)
|
|
result = _run(["mount", "alpha"], tmp_path)
|
|
assert result.returncode != 0
|
|
assert "empty entry" in result.stderr
|
|
|
|
|
|
@pytest.mark.skipif(sys.platform != "linux", reason="user.* xattrs are Linux-only")
|
|
def test_rejects_upperdir_with_fuseoverlayfs_xattr(tmp_path: Path) -> None:
|
|
_setup_instance(tmp_path)
|
|
tainted = tmp_path / "runtime" / "alpha" / "upper" / "deleted-thing"
|
|
tainted.write_bytes(b"")
|
|
try:
|
|
os.setxattr(tainted, "user.fuseoverlayfs.opaque", b"y")
|
|
except OSError:
|
|
pytest.skip("filesystem doesn't support user.* xattrs")
|
|
result = _run(["mount", "alpha"], tmp_path)
|
|
assert result.returncode != 0
|
|
assert "fuse-overlayfs xattr" in result.stderr
|