left4me/l4d2host/tests/test_overlay_helper.py
mwiegand dd918aca4b
fix(left4me-overlay): use /proc/self/mountinfo to detect bind mounts
os.path.ismount() compares st_dev against the parent dir, which silently
returns False for same-fs bind mounts. The idmap binds at runtime/<n>/
idmap/<basename> are exactly that case, so:

- cmd_umount skipped the bind-umount step every stop, leaving orphan
  binds in PID 1's mount namespace.
- cmd_mount's idempotency check then "didn't see" the orphan and
  re-bound on top, accumulating one mount per start/stop cycle.

Findmnt nesting like
    /var/lib/left4me/runtime/2/idmap/overlays_9
    └─/var/lib/left4me/runtime/2/idmap/overlays_9
is the visible symptom. Reboot wipes everything so the bug is invisible
on a fresh boot — only stop/start cycles accumulate.

Replace both ismount sites with a _is_mountpoint() helper that reads
/proc/self/mountinfo (column 5 is the mount point). Keep os.path.ismount
for the overlay merged check, where it's reliable (distinct fs type).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 01:02:18 +02:00

349 lines
13 KiB
Python

import os
import shlex
import subprocess
import sys
from pathlib import Path
import pytest
HELPER_SOURCE = (
Path(__file__).resolve().parents[2]
/ "deploy"
/ "files"
/ "usr"
/ "local"
/ "libexec"
/ "left4me"
/ "left4me-overlay"
)
def _setup_instance(root: Path, name: str = "alpha", lowerdirs: list[str] | None = None) -> None:
"""Create the on-disk shape the helper expects."""
(root / "installation").mkdir(parents=True, exist_ok=True)
(root / "overlays" / "workshop").mkdir(parents=True, exist_ok=True)
if lowerdirs is None:
lowerdirs = [str(root / "overlays" / "workshop"), str(root / "installation")]
inst_dir = root / "instances" / name
inst_dir.mkdir(parents=True)
(inst_dir / "instance.env").write_text(
f"L4D2_PORT=27015\nL4D2_ARGS=-tickrate 100\nL4D2_LOWERDIRS={':'.join(lowerdirs)}\n"
)
runtime = root / "runtime" / name
(runtime / "upper").mkdir(parents=True)
(runtime / "work").mkdir(parents=True)
(runtime / "merged").mkdir(parents=True)
def _run(args: list[str], root: Path, extra_env: dict[str, str] | None = None) -> subprocess.CompletedProcess:
env = {
**os.environ,
"LEFT4ME_ROOT": str(root),
"LEFT4ME_OVERLAY_PRINT_ONLY": "1",
# Inject synthetic user ids so tests work without real system users.
"LEFT4ME_TEST_SANDBOX_UID": str(FAKE_SANDBOX_UID),
"LEFT4ME_TEST_SANDBOX_GID": str(FAKE_SANDBOX_GID),
"LEFT4ME_TEST_LEFT4ME_UID": str(FAKE_LEFT4ME_UID),
"LEFT4ME_TEST_LEFT4ME_GID": str(FAKE_LEFT4ME_GID),
}
if extra_env:
env.update(extra_env)
return subprocess.run(
[sys.executable, str(HELPER_SOURCE), *args],
env=env,
capture_output=True,
text=True,
)
def test_mount_prints_expected_command(tmp_path: Path) -> None:
"""The helper invokes /bin/mount directly. nsenter into PID 1's
mount namespace happens at the systemd Exec line (see the unit
file), so by the time the helper runs, the syscall already lands
in the right namespace.
"""
_setup_instance(tmp_path)
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode == 0, result.stderr
parts = shlex.split(result.stdout.strip())
assert parts[0] == "/bin/mount"
assert parts[1:3] == ["-t", "overlay"]
assert parts[3] == "overlay"
assert parts[4] == "-o"
options = parts[5]
assert f"upperdir={tmp_path}/runtime/alpha/upper" in options
assert f"workdir={tmp_path}/runtime/alpha/work" in options
assert f"lowerdir={tmp_path}/overlays/workshop:{tmp_path}/installation" in options
assert parts[6] == str(tmp_path / "runtime" / "alpha" / "merged")
def test_umount_prints_expected_command(tmp_path: Path) -> None:
"""Same as the mount path: helper invokes /bin/umount directly,
relying on the unit-level nsenter to put it in PID 1's namespace.
"""
_setup_instance(tmp_path)
result = _run(["umount", "alpha"], tmp_path)
assert result.returncode == 0, result.stderr
parts = shlex.split(result.stdout.strip())
assert parts == [
"/bin/umount",
str(tmp_path / "runtime" / "alpha" / "merged"),
]
@pytest.mark.parametrize("bad_name", ["..", "../escape", "FOO", "foo bar", "foo/bar", ""])
def test_rejects_bad_instance_name(tmp_path: Path, bad_name: str) -> None:
result = _run(["mount", bad_name], tmp_path)
assert result.returncode != 0
assert "invalid instance name" in result.stderr or "usage:" in result.stderr
def test_rejects_lowerdir_outside_allowlist(tmp_path: Path) -> None:
_setup_instance(tmp_path, lowerdirs=["/etc"])
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode != 0
assert "outside the permitted roots" in result.stderr
def test_rejects_lowerdir_traversal(tmp_path: Path) -> None:
# An overlay subdirectory whose path uses .. to escape the overlays root.
_setup_instance(tmp_path, lowerdirs=[str(tmp_path / "overlays" / "..") + "/etc"])
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode != 0
assert "outside the permitted roots" in result.stderr or "path does not exist" in result.stderr
def test_rejects_lowerdir_symlink_escape(tmp_path: Path) -> None:
_setup_instance(tmp_path)
sneaky = tmp_path / "overlays" / "sneaky"
os.symlink("/etc", sneaky)
# rewrite instance.env to point at the symlink
inst_env = tmp_path / "instances" / "alpha" / "instance.env"
inst_env.write_text(f"L4D2_LOWERDIRS={sneaky}\n")
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode != 0
assert "outside the permitted roots" in result.stderr
def test_rejects_missing_instance_env(tmp_path: Path) -> None:
(tmp_path / "instances" / "alpha").mkdir(parents=True)
runtime = tmp_path / "runtime" / "alpha"
(runtime / "upper").mkdir(parents=True)
(runtime / "work").mkdir(parents=True)
(runtime / "merged").mkdir(parents=True)
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode != 0
assert "instance.env not found" in result.stderr
def test_rejects_lowerdir_count_over_cap(tmp_path: Path) -> None:
(tmp_path / "installation").mkdir()
many = [str(tmp_path / "installation")] * 501
_setup_instance(tmp_path, lowerdirs=many)
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode != 0
assert "501 entries" in result.stderr
def test_rejects_empty_lowerdir_entry(tmp_path: Path) -> None:
(tmp_path / "installation").mkdir()
_setup_instance(
tmp_path,
lowerdirs=[str(tmp_path / "installation"), "", str(tmp_path / "installation")],
)
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode != 0
assert "empty entry" in result.stderr
FAKE_SANDBOX_UID = 7001
FAKE_SANDBOX_GID = 7001
FAKE_LEFT4ME_UID = 7002
FAKE_LEFT4ME_GID = 7002
def _setup_instance_with_uid(
root: Path,
name: str = "alpha",
lowerdir_uid: int = FAKE_LEFT4ME_UID,
lowerdir_gid: int = FAKE_LEFT4ME_GID,
) -> Path:
"""Like _setup_instance but chowns the lowerdir to a specific uid/gid."""
overlay_dir = root / "overlays" / "workshop"
overlay_dir.mkdir(parents=True, exist_ok=True)
try:
os.chown(overlay_dir, lowerdir_uid, lowerdir_gid)
except PermissionError:
pass # tests not running as root — uid won't match; that's fine for the "skips idmap" test
(root / "installation").mkdir(parents=True, exist_ok=True)
lowerdirs = [str(overlay_dir), str(root / "installation")]
inst_dir = root / "instances" / name
inst_dir.mkdir(parents=True, exist_ok=True)
(inst_dir / "instance.env").write_text(
f"L4D2_LOWERDIRS={':'.join(lowerdirs)}\n"
)
runtime = root / "runtime" / name
(runtime / "upper").mkdir(parents=True, exist_ok=True)
(runtime / "work").mkdir(parents=True, exist_ok=True)
(runtime / "merged").mkdir(parents=True, exist_ok=True)
return overlay_dir
def test_mount_idmaps_sandbox_owned_lowerdir(tmp_path: Path) -> None:
"""A lowerdir owned by l4d2-sandbox uid triggers an idmap bind mount.
The overlay lowerdir= string must reference the idmap path, not the raw
overlay path. A mount --bind --map-users/--map-groups argv must be emitted
before the overlay mount argv.
"""
overlay_dir = _setup_instance_with_uid(
tmp_path, lowerdir_uid=FAKE_SANDBOX_UID, lowerdir_gid=FAKE_SANDBOX_GID
)
try:
os.chown(overlay_dir, FAKE_SANDBOX_UID, FAKE_SANDBOX_GID)
except PermissionError:
pytest.skip("chown requires root — skip on unprivileged runner")
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode == 0, result.stderr
lines = [l for l in result.stdout.splitlines() if l.strip()]
assert len(lines) == 2, f"expected 2 argv lines, got: {result.stdout!r}"
bind_parts = shlex.split(lines[0])
assert bind_parts[0] == "/bin/mount"
assert "--bind" in bind_parts
assert f"--map-users={FAKE_SANDBOX_UID}:{FAKE_LEFT4ME_UID}:1" in bind_parts
assert f"--map-groups={FAKE_SANDBOX_GID}:{FAKE_LEFT4ME_GID}:1" in bind_parts
assert bind_parts[-2] == str(overlay_dir)
idmap_target = str(tmp_path / "runtime" / "alpha" / "idmap" / "overlays_workshop")
assert bind_parts[-1] == idmap_target
overlay_parts = shlex.split(lines[1])
assert overlay_parts[0] == "/bin/mount"
assert overlay_parts[1:3] == ["-t", "overlay"]
options = overlay_parts[5]
assert f"lowerdir={idmap_target}:" in options, \
f"lowerdir should start with idmap path; got: {options!r}"
assert str(overlay_dir) not in options, \
f"raw overlay path should not appear in lowerdir; got: {options!r}"
def test_mount_skips_idmap_for_left4me_owned_lowerdir(tmp_path: Path) -> None:
"""A lowerdir already owned by the left4me uid needs no idmap bind mount."""
overlay_dir = _setup_instance_with_uid(
tmp_path, lowerdir_uid=FAKE_LEFT4ME_UID, lowerdir_gid=FAKE_LEFT4ME_GID
)
# Best-effort chown to the left4me uid — skip if not root.
try:
os.chown(overlay_dir, FAKE_LEFT4ME_UID, FAKE_LEFT4ME_GID)
except PermissionError:
# Without root, st_uid is 0 or our own uid; neither matches FAKE_SANDBOX_UID,
# so the helper will correctly skip the idmap bind either way.
pass
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode == 0, result.stderr
lines = [l for l in result.stdout.splitlines() if l.strip()]
assert len(lines) == 1, f"expected 1 argv line (no bind mount), got: {result.stdout!r}"
overlay_parts = shlex.split(lines[0])
assert overlay_parts[0] == "/bin/mount"
assert "--bind" not in overlay_parts
options = overlay_parts[5]
idmap_subdir = str(tmp_path / "runtime" / "alpha" / "idmap")
assert idmap_subdir not in options, f"idmap path should not appear; got: {options!r}"
assert str(overlay_dir) in options
def test_umount_unwinds_idmap_binds(tmp_path: Path) -> None:
"""umount emits bind-umount lines for each idmap subdir, after the overlay umount."""
_setup_instance(tmp_path)
# Pre-seed an idmap subdir as if a previous mount had set it up.
idmap_dir = tmp_path / "runtime" / "alpha" / "idmap"
idmap_dir.mkdir(parents=True)
idmap_sub = idmap_dir / "workshop"
idmap_sub.mkdir()
result = _run(["umount", "alpha"], tmp_path)
assert result.returncode == 0, result.stderr
lines = [l for l in result.stdout.splitlines() if l.strip()]
assert len(lines) >= 2, f"expected at least 2 argv lines, got: {result.stdout!r}"
# First line: overlay umount
overlay_umount_parts = shlex.split(lines[0])
assert overlay_umount_parts == [
"/bin/umount",
str(tmp_path / "runtime" / "alpha" / "merged"),
]
# Subsequent lines: bind umounts for each idmap subdir
bind_umount_parts = shlex.split(lines[1])
assert bind_umount_parts[0] == "/bin/umount"
assert bind_umount_parts[-1] == str(idmap_sub)
@pytest.mark.skipif(sys.platform != "linux", reason="user.* xattrs are Linux-only")
def test_rejects_upperdir_with_fuseoverlayfs_xattr(tmp_path: Path) -> None:
_setup_instance(tmp_path)
tainted = tmp_path / "runtime" / "alpha" / "upper" / "deleted-thing"
tainted.write_bytes(b"")
try:
os.setxattr(tainted, "user.fuseoverlayfs.opaque", b"y")
except OSError:
pytest.skip("filesystem doesn't support user.* xattrs")
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode != 0
assert "fuse-overlayfs xattr" in result.stderr
def _load_helper_module():
"""Import the helper script as a Python module for unit testing internals.
The helper file has no .py extension, so importlib needs an explicit
SourceFileLoader rather than auto-detection.
"""
import importlib.util
from importlib.machinery import SourceFileLoader
loader = SourceFileLoader("left4me_overlay", str(HELPER_SOURCE))
spec = importlib.util.spec_from_loader("left4me_overlay", loader)
assert spec is not None
module = importlib.util.module_from_spec(spec)
loader.exec_module(module)
return module
def test_is_mountpoint_detects_same_fs_bind_mount(tmp_path: Path) -> None:
"""_is_mountpoint reads /proc/self/mountinfo so it works for same-fs bind mounts.
Regression: os.path.ismount() compares st_dev against the parent, which
silently returns False for same-fs bind mounts. The idmap binds we install
on runtime/<n>/idmap/<basename> are exactly that case, so an ismount-based
check skipped umount on stop and re-bound on top on start — accumulating
mount-table entries across stop/start cycles.
"""
helper = _load_helper_module()
target = tmp_path / "some-bind"
target.mkdir()
abs_target = str(target.resolve())
mountinfo = tmp_path / "fake-mountinfo"
# mountinfo column 5 is the mountpoint; build a minimal line that exercises
# the parse without depending on the rest of the format.
mountinfo.write_text(
f"42 1 0:30 / {abs_target} rw,relatime - tmpfs tmpfs rw\n"
f"43 1 0:31 / /some/other/path rw,relatime - tmpfs tmpfs rw\n"
)
assert helper._is_mountpoint(target, str(mountinfo)) is True
assert helper._is_mountpoint(tmp_path / "not-a-mount", str(mountinfo)) is False
assert helper._is_mountpoint(target, str(tmp_path / "no-such-file")) is False