feat(l4d2-host): KernelOverlayFSMounter + left4me-overlay helper
New privileged helper at /usr/local/libexec/left4me/left4me-overlay (Python, system /usr/bin/python3, stdlib only) takes only the instance name, parses instance.env for L4D2_LOWERDIRS, validates each lowerdir against an allowlist (installation/, overlays/, global_overlay_cache/, workshop_cache/), refuses upperdirs tainted with user.fuseoverlayfs.* xattrs from the prior fuse era, and execs `nsenter --mount=/proc/1/ns/mnt -- mount -t overlay ...` so the resulting mount lives in the host namespace. Mirrors the existing left4me-systemctl / left4me-journalctl pattern; sudoers entry is verb-constrained. KernelOverlayFSMounter implements the existing OverlayMounter ABC, deriving the instance name from the merged path. No call sites use it yet — that's the next commit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
db120d77d3
commit
d5b321b557
7 changed files with 515 additions and 1 deletions
|
|
@ -130,7 +130,8 @@ $sudo_cmd cp /opt/left4me/deploy/files/usr/local/lib/systemd/system/left4me-refr
|
|||
$sudo_cmd cp /opt/left4me/deploy/files/usr/local/lib/systemd/system/left4me-refresh-global-overlays.timer /usr/local/lib/systemd/system/left4me-refresh-global-overlays.timer
|
||||
$sudo_cmd cp /opt/left4me/deploy/files/usr/local/libexec/left4me/left4me-systemctl /usr/local/libexec/left4me/left4me-systemctl
|
||||
$sudo_cmd cp /opt/left4me/deploy/files/usr/local/libexec/left4me/left4me-journalctl /usr/local/libexec/left4me/left4me-journalctl
|
||||
$sudo_cmd chmod 0755 /usr/local/libexec/left4me/left4me-systemctl /usr/local/libexec/left4me/left4me-journalctl
|
||||
$sudo_cmd cp /opt/left4me/deploy/files/usr/local/libexec/left4me/left4me-overlay /usr/local/libexec/left4me/left4me-overlay
|
||||
$sudo_cmd chmod 0755 /usr/local/libexec/left4me/left4me-systemctl /usr/local/libexec/left4me/left4me-journalctl /usr/local/libexec/left4me/left4me-overlay
|
||||
$sudo_cmd cp /opt/left4me/deploy/files/etc/sudoers.d/left4me /etc/sudoers.d/left4me
|
||||
$sudo_cmd chmod 0440 /etc/sudoers.d/left4me
|
||||
$sudo_cmd visudo -cf /etc/sudoers.d/left4me
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
Defaults:left4me !requiretty
|
||||
left4me ALL=(root) NOPASSWD: /usr/local/libexec/left4me/left4me-systemctl *
|
||||
left4me ALL=(root) NOPASSWD: /usr/local/libexec/left4me/left4me-journalctl *
|
||||
left4me ALL=(root) NOPASSWD: /usr/local/libexec/left4me/left4me-overlay mount *, /usr/local/libexec/left4me/left4me-overlay umount *
|
||||
|
|
|
|||
188
deploy/files/usr/local/libexec/left4me/left4me-overlay
Normal file
188
deploy/files/usr/local/libexec/left4me/left4me-overlay
Normal file
|
|
@ -0,0 +1,188 @@
|
|||
#!/usr/bin/python3
|
||||
"""Privileged overlay mount helper for left4me.
|
||||
|
||||
Invoked via sudo by the left4me runtime user. Validates inputs strictly,
|
||||
then enters PID 1's mount namespace via nsenter to perform the actual
|
||||
mount/umount syscall, so the resulting mount lives in the host namespace
|
||||
and is visible to the systemd-managed gameserver units.
|
||||
|
||||
Verbs:
|
||||
mount <name> Reads ${LEFT4ME_ROOT}/instances/<name>/instance.env
|
||||
for L4D2_LOWERDIRS, validates every lowerdir is
|
||||
under one of installation/overlays/workshop_cache/
|
||||
global_overlay_cache, then mounts the kernel
|
||||
overlay at runtime/<name>/merged.
|
||||
umount <name> Unmounts runtime/<name>/merged.
|
||||
|
||||
Set LEFT4ME_OVERLAY_PRINT_ONLY=1 to print the would-be argv (one line,
|
||||
shell-quoted) and exit 0 instead of execv. Used by tests.
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import shlex
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
NAME_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,63}$")
|
||||
DEFAULT_ROOT = "/var/lib/left4me"
|
||||
LOWERDIR_ALLOWLIST = (
|
||||
"installation",
|
||||
"overlays",
|
||||
"global_overlay_cache",
|
||||
"workshop_cache",
|
||||
)
|
||||
MAX_LOWERDIRS = 500
|
||||
NSENTER = "/usr/bin/nsenter"
|
||||
MOUNT_BIN = "/bin/mount"
|
||||
UMOUNT_BIN = "/bin/umount"
|
||||
|
||||
|
||||
def die(msg: str) -> None:
|
||||
sys.stderr.write(f"left4me-overlay: {msg}\n")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def root() -> Path:
|
||||
return Path(os.environ.get("LEFT4ME_ROOT") or DEFAULT_ROOT)
|
||||
|
||||
|
||||
def validate_name(name: str) -> str:
|
||||
if not NAME_RE.fullmatch(name):
|
||||
die(f"invalid instance name: {name!r}")
|
||||
return name
|
||||
|
||||
|
||||
def parse_lowerdirs(env_path: Path) -> list[str]:
|
||||
if not env_path.is_file():
|
||||
die(f"instance.env not found: {env_path}")
|
||||
raw = None
|
||||
for line in env_path.read_text().splitlines():
|
||||
if "=" not in line:
|
||||
continue
|
||||
key, value = line.split("=", 1)
|
||||
if key.strip() == "L4D2_LOWERDIRS":
|
||||
raw = value
|
||||
break
|
||||
if raw is None:
|
||||
die(f"L4D2_LOWERDIRS not set in {env_path}")
|
||||
if raw == "":
|
||||
die(f"L4D2_LOWERDIRS is empty in {env_path}")
|
||||
parts = raw.split(":")
|
||||
if any(p == "" for p in parts):
|
||||
die(f"L4D2_LOWERDIRS contains an empty entry: {raw!r}")
|
||||
if len(parts) > MAX_LOWERDIRS:
|
||||
die(f"L4D2_LOWERDIRS has {len(parts)} entries (cap {MAX_LOWERDIRS})")
|
||||
return parts
|
||||
|
||||
|
||||
def canonical_under(allowed_roots: list[Path], path: Path) -> Path:
|
||||
try:
|
||||
canonical = path.resolve(strict=True)
|
||||
except (FileNotFoundError, RuntimeError):
|
||||
die(f"path does not exist or has a symlink loop: {path}")
|
||||
for r in allowed_roots:
|
||||
if canonical == r or r in canonical.parents:
|
||||
return canonical
|
||||
die(f"path is outside the permitted roots: {path} (resolved: {canonical})")
|
||||
|
||||
|
||||
_LISTXATTR = getattr(os, "listxattr", None)
|
||||
|
||||
|
||||
def _entry_has_fuse_xattr(path: str) -> str | None:
|
||||
if _LISTXATTR is None:
|
||||
return None
|
||||
try:
|
||||
attrs = _LISTXATTR(path, follow_symlinks=False)
|
||||
except OSError:
|
||||
return None
|
||||
for a in attrs:
|
||||
if a.startswith("user.fuseoverlayfs."):
|
||||
return a
|
||||
return None
|
||||
|
||||
|
||||
def assert_no_fuse_xattrs(upper: Path) -> None:
|
||||
if not upper.exists() or _LISTXATTR is None:
|
||||
return
|
||||
for dirpath, dirnames, filenames in os.walk(upper):
|
||||
for entry in (dirpath, *(os.path.join(dirpath, n) for n in dirnames),
|
||||
*(os.path.join(dirpath, n) for n in filenames)):
|
||||
tainted = _entry_has_fuse_xattr(entry)
|
||||
if tainted:
|
||||
die(
|
||||
f"upperdir contains fuse-overlayfs xattr {tainted!r} on {entry}; "
|
||||
"wipe upper/ and work/ before mounting"
|
||||
)
|
||||
|
||||
|
||||
def exec_or_print(argv: list[str]) -> None:
|
||||
if os.environ.get("LEFT4ME_OVERLAY_PRINT_ONLY") == "1":
|
||||
print(" ".join(shlex.quote(a) for a in argv))
|
||||
sys.exit(0)
|
||||
os.execv(argv[0], argv)
|
||||
|
||||
|
||||
def cmd_mount(name: str) -> None:
|
||||
name = validate_name(name)
|
||||
r = root()
|
||||
instance_env = r / "instances" / name / "instance.env"
|
||||
raw_lowerdirs = parse_lowerdirs(instance_env)
|
||||
|
||||
allowed_roots = [(r / sub).resolve() for sub in LOWERDIR_ALLOWLIST]
|
||||
canonical_lowerdirs = [str(canonical_under(allowed_roots, Path(p))) for p in raw_lowerdirs]
|
||||
|
||||
runtime_name_dir = (r / "runtime" / name).resolve(strict=True)
|
||||
upper = (runtime_name_dir / "upper").resolve(strict=True)
|
||||
work = (runtime_name_dir / "work").resolve(strict=True)
|
||||
merged = (runtime_name_dir / "merged").resolve(strict=True)
|
||||
for label, path in (("upper", upper), ("work", work), ("merged", merged)):
|
||||
if path.parent != runtime_name_dir:
|
||||
die(f"{label} resolved outside runtime/{name}: {path}")
|
||||
|
||||
assert_no_fuse_xattrs(upper)
|
||||
|
||||
options = f"lowerdir={':'.join(canonical_lowerdirs)},upperdir={upper},workdir={work}"
|
||||
argv = [
|
||||
NSENTER,
|
||||
"--mount=/proc/1/ns/mnt",
|
||||
"--",
|
||||
MOUNT_BIN,
|
||||
"-t", "overlay",
|
||||
"overlay",
|
||||
"-o", options,
|
||||
str(merged),
|
||||
]
|
||||
exec_or_print(argv)
|
||||
|
||||
|
||||
def cmd_umount(name: str) -> None:
|
||||
name = validate_name(name)
|
||||
r = root()
|
||||
runtime_name_dir = (r / "runtime" / name).resolve(strict=True)
|
||||
merged = (runtime_name_dir / "merged").resolve(strict=True)
|
||||
if merged.parent != runtime_name_dir:
|
||||
die(f"merged resolved outside runtime/{name}: {merged}")
|
||||
argv = [
|
||||
NSENTER,
|
||||
"--mount=/proc/1/ns/mnt",
|
||||
"--",
|
||||
UMOUNT_BIN,
|
||||
str(merged),
|
||||
]
|
||||
exec_or_print(argv)
|
||||
|
||||
|
||||
def main(argv: list[str]) -> None:
|
||||
if len(argv) != 3 or argv[1] not in ("mount", "umount"):
|
||||
sys.stderr.write("usage: left4me-overlay mount|umount <name>\n")
|
||||
sys.exit(2)
|
||||
if argv[1] == "mount":
|
||||
cmd_mount(argv[2])
|
||||
else:
|
||||
cmd_umount(argv[2])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(sys.argv)
|
||||
|
|
@ -13,6 +13,7 @@ GLOBAL_REFRESH_SERVICE = DEPLOY / "files/usr/local/lib/systemd/system/left4me-re
|
|||
GLOBAL_REFRESH_TIMER = DEPLOY / "files/usr/local/lib/systemd/system/left4me-refresh-global-overlays.timer"
|
||||
SYSTEMCTL_HELPER = DEPLOY / "files/usr/local/libexec/left4me/left4me-systemctl"
|
||||
JOURNALCTL_HELPER = DEPLOY / "files/usr/local/libexec/left4me/left4me-journalctl"
|
||||
OVERLAY_HELPER = DEPLOY / "files/usr/local/libexec/left4me/left4me-overlay"
|
||||
SUDOERS = DEPLOY / "files/etc/sudoers.d/left4me"
|
||||
HOST_ENV = DEPLOY / "templates/etc/left4me/host.env"
|
||||
WEB_ENV_TEMPLATE = DEPLOY / "templates/etc/left4me/web.env.template"
|
||||
|
|
@ -146,10 +147,36 @@ def test_sudoers_allows_only_left4me_helpers_not_raw_system_tools():
|
|||
"left4me ALL=(root) NOPASSWD: "
|
||||
"/usr/local/libexec/left4me/left4me-journalctl *"
|
||||
) in sudoers
|
||||
assert "/usr/local/libexec/left4me/left4me-overlay mount *" in sudoers
|
||||
assert "/usr/local/libexec/left4me/left4me-overlay umount *" in sudoers
|
||||
assert "/bin/systemctl" not in sudoers
|
||||
assert "/usr/bin/systemctl" not in sudoers
|
||||
assert "/bin/journalctl" not in sudoers
|
||||
assert "/usr/bin/journalctl" not in sudoers
|
||||
assert "/bin/mount" not in sudoers
|
||||
assert "/bin/umount" not in sudoers
|
||||
|
||||
|
||||
def test_overlay_helper_is_python_with_strict_validation():
|
||||
text = OVERLAY_HELPER.read_text()
|
||||
assert text.startswith("#!/usr/bin/python3")
|
||||
# Validation surface
|
||||
assert "NAME_RE = re.compile" in text
|
||||
assert "LOWERDIR_ALLOWLIST" in text
|
||||
assert "user.fuseoverlayfs." in text
|
||||
assert "MAX_LOWERDIRS = 500" in text
|
||||
# Mounts via PID 1's mount namespace
|
||||
assert "/proc/1/ns/mnt" in text
|
||||
assert "nsenter" in text
|
||||
# Verbs are mount and umount (not unmount)
|
||||
assert '"mount"' in text and '"umount"' in text
|
||||
assert '"unmount"' not in text
|
||||
|
||||
|
||||
def test_deploy_script_installs_overlay_helper_with_executable_mode():
|
||||
script = DEPLOY_SCRIPT.read_text()
|
||||
assert "/usr/local/libexec/left4me/left4me-overlay" in script
|
||||
assert "chmod 0755" in script and "left4me-overlay" in script
|
||||
|
||||
|
||||
def test_env_templates_contain_required_defaults():
|
||||
|
|
|
|||
53
l4d2host/fs/kernel_overlayfs.py
Normal file
53
l4d2host/fs/kernel_overlayfs.py
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
from l4d2host.fs.base import OverlayMounter
|
||||
from l4d2host.process import run_command
|
||||
|
||||
|
||||
HELPER_PATH = "/usr/local/libexec/left4me/left4me-overlay"
|
||||
|
||||
|
||||
class KernelOverlayFSMounter(OverlayMounter):
|
||||
# Delegates the actual mount/umount syscalls to the privileged
|
||||
# left4me-overlay helper. The helper takes only the instance name and
|
||||
# rederives lowerdirs/upper/work/merged from disk; the OverlayMounter
|
||||
# ABC accepts those args for compatibility, so we extract the name
|
||||
# from the merged path's parent directory.
|
||||
def mount(
|
||||
self,
|
||||
*,
|
||||
lowerdirs: str,
|
||||
upperdir: Path,
|
||||
workdir: Path,
|
||||
merged: Path,
|
||||
on_stdout: Callable[[str], None] | None = None,
|
||||
on_stderr: Callable[[str], None] | None = None,
|
||||
passthrough: bool = False,
|
||||
should_cancel: Callable[[], bool] | None = None,
|
||||
) -> None:
|
||||
del lowerdirs, upperdir, workdir
|
||||
run_command(
|
||||
["sudo", "-n", HELPER_PATH, "mount", merged.parent.name],
|
||||
on_stdout=on_stdout,
|
||||
on_stderr=on_stderr,
|
||||
passthrough=passthrough,
|
||||
should_cancel=should_cancel,
|
||||
)
|
||||
|
||||
def unmount(
|
||||
self,
|
||||
*,
|
||||
merged: Path,
|
||||
on_stdout: Callable[[str], None] | None = None,
|
||||
on_stderr: Callable[[str], None] | None = None,
|
||||
passthrough: bool = False,
|
||||
should_cancel: Callable[[], bool] | None = None,
|
||||
) -> None:
|
||||
run_command(
|
||||
["sudo", "-n", HELPER_PATH, "umount", merged.parent.name],
|
||||
on_stdout=on_stdout,
|
||||
on_stderr=on_stderr,
|
||||
passthrough=passthrough,
|
||||
should_cancel=should_cancel,
|
||||
)
|
||||
76
l4d2host/tests/test_kernel_overlayfs.py
Normal file
76
l4d2host/tests/test_kernel_overlayfs.py
Normal file
|
|
@ -0,0 +1,76 @@
|
|||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
HELPER_PATH = "/usr/local/libexec/left4me/left4me-overlay"
|
||||
|
||||
|
||||
def test_mount_invokes_helper_with_name_only(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
from l4d2host.fs.kernel_overlayfs import KernelOverlayFSMounter
|
||||
|
||||
calls: list[list[str]] = []
|
||||
|
||||
def fake_run_command(cmd, **kwargs):
|
||||
del kwargs
|
||||
calls.append(list(cmd))
|
||||
|
||||
monkeypatch.setattr("l4d2host.fs.kernel_overlayfs.run_command", fake_run_command)
|
||||
|
||||
KernelOverlayFSMounter().mount(
|
||||
lowerdirs="/var/lib/left4me/installation",
|
||||
upperdir=Path("/var/lib/left4me/runtime/alpha/upper"),
|
||||
workdir=Path("/var/lib/left4me/runtime/alpha/work"),
|
||||
merged=Path("/var/lib/left4me/runtime/alpha/merged"),
|
||||
)
|
||||
|
||||
assert calls == [["sudo", "-n", HELPER_PATH, "mount", "alpha"]]
|
||||
|
||||
|
||||
def test_unmount_invokes_helper_with_umount_verb(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
from l4d2host.fs.kernel_overlayfs import KernelOverlayFSMounter
|
||||
|
||||
calls: list[list[str]] = []
|
||||
|
||||
def fake_run_command(cmd, **kwargs):
|
||||
del kwargs
|
||||
calls.append(list(cmd))
|
||||
|
||||
monkeypatch.setattr("l4d2host.fs.kernel_overlayfs.run_command", fake_run_command)
|
||||
|
||||
KernelOverlayFSMounter().unmount(merged=Path("/var/lib/left4me/runtime/alpha/merged"))
|
||||
|
||||
assert calls == [["sudo", "-n", HELPER_PATH, "umount", "alpha"]]
|
||||
|
||||
|
||||
def test_mount_propagates_run_command_kwargs(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
from l4d2host.fs.kernel_overlayfs import KernelOverlayFSMounter
|
||||
|
||||
captured: dict = {}
|
||||
|
||||
def fake_run_command(cmd, **kwargs):
|
||||
captured["cmd"] = list(cmd)
|
||||
captured["kwargs"] = kwargs
|
||||
|
||||
monkeypatch.setattr("l4d2host.fs.kernel_overlayfs.run_command", fake_run_command)
|
||||
|
||||
out: list[str] = []
|
||||
err: list[str] = []
|
||||
KernelOverlayFSMounter().mount(
|
||||
lowerdirs="/var/lib/left4me/installation",
|
||||
upperdir=Path("/var/lib/left4me/runtime/alpha/upper"),
|
||||
workdir=Path("/var/lib/left4me/runtime/alpha/work"),
|
||||
merged=Path("/var/lib/left4me/runtime/alpha/merged"),
|
||||
on_stdout=out.append,
|
||||
on_stderr=err.append,
|
||||
passthrough=False,
|
||||
should_cancel=lambda: False,
|
||||
)
|
||||
|
||||
assert captured["cmd"][0:3] == ["sudo", "-n", HELPER_PATH]
|
||||
captured["kwargs"]["on_stdout"]("hi")
|
||||
captured["kwargs"]["on_stderr"]("oops")
|
||||
assert out == ["hi"]
|
||||
assert err == ["oops"]
|
||||
assert captured["kwargs"]["passthrough"] is False
|
||||
assert callable(captured["kwargs"]["should_cancel"])
|
||||
168
l4d2host/tests/test_overlay_helper.py
Normal file
168
l4d2host/tests/test_overlay_helper.py
Normal file
|
|
@ -0,0 +1,168 @@
|
|||
import os
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
HELPER_SOURCE = (
|
||||
Path(__file__).resolve().parents[2]
|
||||
/ "deploy"
|
||||
/ "files"
|
||||
/ "usr"
|
||||
/ "local"
|
||||
/ "libexec"
|
||||
/ "left4me"
|
||||
/ "left4me-overlay"
|
||||
)
|
||||
|
||||
|
||||
def _setup_instance(root: Path, name: str = "alpha", lowerdirs: list[str] | None = None) -> None:
|
||||
"""Create the on-disk shape the helper expects."""
|
||||
(root / "installation").mkdir(parents=True, exist_ok=True)
|
||||
(root / "overlays" / "workshop").mkdir(parents=True, exist_ok=True)
|
||||
if lowerdirs is None:
|
||||
lowerdirs = [str(root / "overlays" / "workshop"), str(root / "installation")]
|
||||
inst_dir = root / "instances" / name
|
||||
inst_dir.mkdir(parents=True)
|
||||
(inst_dir / "instance.env").write_text(
|
||||
f"L4D2_PORT=27015\nL4D2_ARGS=-tickrate 100\nL4D2_LOWERDIRS={':'.join(lowerdirs)}\n"
|
||||
)
|
||||
runtime = root / "runtime" / name
|
||||
(runtime / "upper").mkdir(parents=True)
|
||||
(runtime / "work").mkdir(parents=True)
|
||||
(runtime / "merged").mkdir(parents=True)
|
||||
|
||||
|
||||
def _run(args: list[str], root: Path, extra_env: dict[str, str] | None = None) -> subprocess.CompletedProcess:
|
||||
env = {
|
||||
**os.environ,
|
||||
"LEFT4ME_ROOT": str(root),
|
||||
"LEFT4ME_OVERLAY_PRINT_ONLY": "1",
|
||||
}
|
||||
if extra_env:
|
||||
env.update(extra_env)
|
||||
return subprocess.run(
|
||||
[sys.executable, str(HELPER_SOURCE), *args],
|
||||
env=env,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
|
||||
def test_mount_prints_expected_nsenter_command(tmp_path: Path) -> None:
|
||||
_setup_instance(tmp_path)
|
||||
|
||||
result = _run(["mount", "alpha"], tmp_path)
|
||||
|
||||
assert result.returncode == 0, result.stderr
|
||||
parts = shlex.split(result.stdout.strip())
|
||||
assert parts[0] == "/usr/bin/nsenter"
|
||||
assert parts[1] == "--mount=/proc/1/ns/mnt"
|
||||
assert parts[2] == "--"
|
||||
assert parts[3] == "/bin/mount"
|
||||
assert parts[4:6] == ["-t", "overlay"]
|
||||
assert parts[6] == "overlay"
|
||||
assert parts[7] == "-o"
|
||||
options = parts[8]
|
||||
assert f"upperdir={tmp_path}/runtime/alpha/upper" in options
|
||||
assert f"workdir={tmp_path}/runtime/alpha/work" in options
|
||||
assert f"lowerdir={tmp_path}/overlays/workshop:{tmp_path}/installation" in options
|
||||
assert parts[9] == str(tmp_path / "runtime" / "alpha" / "merged")
|
||||
|
||||
|
||||
def test_umount_prints_expected_nsenter_command(tmp_path: Path) -> None:
|
||||
_setup_instance(tmp_path)
|
||||
|
||||
result = _run(["umount", "alpha"], tmp_path)
|
||||
|
||||
assert result.returncode == 0, result.stderr
|
||||
parts = shlex.split(result.stdout.strip())
|
||||
assert parts == [
|
||||
"/usr/bin/nsenter",
|
||||
"--mount=/proc/1/ns/mnt",
|
||||
"--",
|
||||
"/bin/umount",
|
||||
str(tmp_path / "runtime" / "alpha" / "merged"),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("bad_name", ["..", "../escape", "FOO", "foo bar", "foo/bar", ""])
|
||||
def test_rejects_bad_instance_name(tmp_path: Path, bad_name: str) -> None:
|
||||
result = _run(["mount", bad_name], tmp_path)
|
||||
assert result.returncode != 0
|
||||
assert "invalid instance name" in result.stderr or "usage:" in result.stderr
|
||||
|
||||
|
||||
def test_rejects_lowerdir_outside_allowlist(tmp_path: Path) -> None:
|
||||
_setup_instance(tmp_path, lowerdirs=["/etc"])
|
||||
result = _run(["mount", "alpha"], tmp_path)
|
||||
assert result.returncode != 0
|
||||
assert "outside the permitted roots" in result.stderr
|
||||
|
||||
|
||||
def test_rejects_lowerdir_traversal(tmp_path: Path) -> None:
|
||||
# An overlay subdirectory whose path uses .. to escape the overlays root.
|
||||
_setup_instance(tmp_path, lowerdirs=[str(tmp_path / "overlays" / "..") + "/etc"])
|
||||
result = _run(["mount", "alpha"], tmp_path)
|
||||
assert result.returncode != 0
|
||||
assert "outside the permitted roots" in result.stderr or "path does not exist" in result.stderr
|
||||
|
||||
|
||||
def test_rejects_lowerdir_symlink_escape(tmp_path: Path) -> None:
|
||||
_setup_instance(tmp_path)
|
||||
sneaky = tmp_path / "overlays" / "sneaky"
|
||||
os.symlink("/etc", sneaky)
|
||||
# rewrite instance.env to point at the symlink
|
||||
inst_env = tmp_path / "instances" / "alpha" / "instance.env"
|
||||
inst_env.write_text(f"L4D2_LOWERDIRS={sneaky}\n")
|
||||
result = _run(["mount", "alpha"], tmp_path)
|
||||
assert result.returncode != 0
|
||||
assert "outside the permitted roots" in result.stderr
|
||||
|
||||
|
||||
def test_rejects_missing_instance_env(tmp_path: Path) -> None:
|
||||
(tmp_path / "instances" / "alpha").mkdir(parents=True)
|
||||
runtime = tmp_path / "runtime" / "alpha"
|
||||
(runtime / "upper").mkdir(parents=True)
|
||||
(runtime / "work").mkdir(parents=True)
|
||||
(runtime / "merged").mkdir(parents=True)
|
||||
result = _run(["mount", "alpha"], tmp_path)
|
||||
assert result.returncode != 0
|
||||
assert "instance.env not found" in result.stderr
|
||||
|
||||
|
||||
def test_rejects_lowerdir_count_over_cap(tmp_path: Path) -> None:
|
||||
(tmp_path / "installation").mkdir()
|
||||
many = [str(tmp_path / "installation")] * 501
|
||||
_setup_instance(tmp_path, lowerdirs=many)
|
||||
result = _run(["mount", "alpha"], tmp_path)
|
||||
assert result.returncode != 0
|
||||
assert "501 entries" in result.stderr
|
||||
|
||||
|
||||
def test_rejects_empty_lowerdir_entry(tmp_path: Path) -> None:
|
||||
(tmp_path / "installation").mkdir()
|
||||
_setup_instance(
|
||||
tmp_path,
|
||||
lowerdirs=[str(tmp_path / "installation"), "", str(tmp_path / "installation")],
|
||||
)
|
||||
result = _run(["mount", "alpha"], tmp_path)
|
||||
assert result.returncode != 0
|
||||
assert "empty entry" in result.stderr
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform != "linux", reason="user.* xattrs are Linux-only")
|
||||
def test_rejects_upperdir_with_fuseoverlayfs_xattr(tmp_path: Path) -> None:
|
||||
_setup_instance(tmp_path)
|
||||
tainted = tmp_path / "runtime" / "alpha" / "upper" / "deleted-thing"
|
||||
tainted.write_bytes(b"")
|
||||
try:
|
||||
os.setxattr(tainted, "user.fuseoverlayfs.opaque", b"y")
|
||||
except OSError:
|
||||
pytest.skip("filesystem doesn't support user.* xattrs")
|
||||
result = _run(["mount", "alpha"], tmp_path)
|
||||
assert result.returncode != 0
|
||||
assert "fuse-overlayfs xattr" in result.stderr
|
||||
Loading…
Reference in a new issue