feat(left4me-overlay): idmap bind mounts for l4d2-sandbox-owned lowerdirs

Insert an idmapped bind mount in front of each lowerdir whose top-level
uid matches l4d2-sandbox at overlay-mount time, so that overlayfs copy-up
produces left4me-owned upperdir entries instead of EACCES.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
mwiegand 2026-05-14 23:48:07 +02:00
parent 3a2c379b71
commit 2f6a9cfba0
No known key found for this signature in database
2 changed files with 261 additions and 8 deletions

View file

@ -29,6 +29,7 @@ shell-quoted) and exit 0 instead of execv. Used by tests.
""" """
import os import os
import pwd
import re import re
import shlex import shlex
import shutil import shutil
@ -54,6 +55,45 @@ def die(msg: str) -> None:
sys.exit(1) sys.exit(1)
def _lookup_uid(username: str) -> tuple[int, int]:
"""Return (uid, gid) for *username*, dying with a clear message if missing."""
try:
entry = pwd.getpwnam(username)
except KeyError:
die(
f"required system user {username!r} does not exist; "
"this is a deploy misconfiguration"
)
return entry.pw_uid, entry.pw_gid
def _get_user_ids() -> tuple[int, int, int, int]:
"""Return (sandbox_uid, sandbox_gid, left4me_uid, left4me_gid).
In normal operation, looks up the real system users. When the test-only
env vars _L4D2_SANDBOX_UID/_L4D2_SANDBOX_GID/_LEFT4ME_UID/_LEFT4ME_GID
are set, those values are used directly so tests can run without root
and without real system users being present.
"""
sandbox_uid_env = os.environ.get("_L4D2_SANDBOX_UID")
sandbox_gid_env = os.environ.get("_L4D2_SANDBOX_GID")
left4me_uid_env = os.environ.get("_LEFT4ME_UID")
left4me_gid_env = os.environ.get("_LEFT4ME_GID")
if all(v is not None for v in (sandbox_uid_env, sandbox_gid_env,
left4me_uid_env, left4me_gid_env)):
return (
int(sandbox_uid_env), # type: ignore[arg-type]
int(sandbox_gid_env), # type: ignore[arg-type]
int(left4me_uid_env), # type: ignore[arg-type]
int(left4me_gid_env), # type: ignore[arg-type]
)
sandbox_uid, sandbox_gid = _lookup_uid("l4d2-sandbox")
left4me_uid, left4me_gid = _lookup_uid("left4me")
return sandbox_uid, sandbox_gid, left4me_uid, left4me_gid
def root() -> Path: def root() -> Path:
return Path(os.environ.get("LEFT4ME_ROOT") or DEFAULT_ROOT) return Path(os.environ.get("LEFT4ME_ROOT") or DEFAULT_ROOT)
@ -128,9 +168,14 @@ def assert_no_fuse_xattrs(upper: Path) -> None:
) )
def _print_argv(argv: list[str]) -> None:
"""Emit one shell-quoted argv line to stdout (PRINT_ONLY helper, no exit)."""
print(" ".join(shlex.quote(a) for a in argv))
def exec_or_print(argv: list[str]) -> None: def exec_or_print(argv: list[str]) -> None:
if os.environ.get("LEFT4ME_OVERLAY_PRINT_ONLY") == "1": if os.environ.get("LEFT4ME_OVERLAY_PRINT_ONLY") == "1":
print(" ".join(shlex.quote(a) for a in argv)) _print_argv(argv)
sys.exit(0) sys.exit(0)
os.execv(argv[0], argv) os.execv(argv[0], argv)
@ -168,7 +213,62 @@ def cmd_mount(name: str) -> None:
assert_no_fuse_xattrs(upper) assert_no_fuse_xattrs(upper)
options = f"lowerdir={':'.join(canonical_lowerdirs)},upperdir={upper},workdir={work}" # Resolve user ids now (fails fast on deploy misconfiguration).
sandbox_uid, sandbox_gid, left4me_uid, left4me_gid = _get_user_ids()
# Build the final lowerdir list, substituting idmap bind-mount paths for
# any lowerdir owned by l4d2-sandbox. An idmap bind mount makes the kernel
# see the l4d2-sandbox-owned tree as if it were owned by left4me, so that
# overlayfs copy-up produces left4me-owned upperdir entries.
idmap_dir = runtime_name_dir / "idmap"
final_lowerdirs: list[str] = []
bind_argvs: list[list[str]] = []
for lowerdir in canonical_lowerdirs:
st = os.stat(lowerdir)
if st.st_uid == sandbox_uid:
# This lowerdir needs idmap remapping.
overlay_id = Path(lowerdir).name
idmap_target = idmap_dir / overlay_id
if os.environ.get("LEFT4ME_OVERLAY_PRINT_ONLY") != "1":
idmap_dir.mkdir(mode=0o700, exist_ok=True)
idmap_target.mkdir(mode=0o700, exist_ok=True)
if not os.path.ismount(idmap_target) or \
os.environ.get("LEFT4ME_OVERLAY_PRINT_ONLY") == "1":
# --map-users / --map-groups argument format:
# <on-disk-uid>:<in-mount-uid>:<count>
# The util-linux man page calls these <inner>:<outer>, which is
# misleading. Empirically (verified on left4.me, kernel 6.12,
# ext4) the FIRST number is the on-disk uid and the SECOND is
# the uid exposed inside the mount. Don't swap them.
bind_argv = [
MOUNT_BIN,
"--bind",
f"--map-users={sandbox_uid}:{left4me_uid}:1",
f"--map-groups={sandbox_gid}:{left4me_gid}:1",
lowerdir,
str(idmap_target),
]
bind_argvs.append(bind_argv)
final_lowerdirs.append(str(idmap_target))
else:
final_lowerdirs.append(lowerdir)
print_only = os.environ.get("LEFT4ME_OVERLAY_PRINT_ONLY") == "1"
if print_only:
# Emit each bind-mount argv first, then fall through to the overlay argv.
for bind_argv in bind_argvs:
_print_argv(bind_argv)
else:
# Actually exec each bind mount before the overlay mount.
for bind_argv in bind_argvs:
subprocess.run(bind_argv, check=True)
options = f"lowerdir={':'.join(final_lowerdirs)},upperdir={upper},workdir={work}"
argv = [ argv = [
MOUNT_BIN, MOUNT_BIN,
"-t", "overlay", "-t", "overlay",
@ -186,17 +286,26 @@ def cmd_umount(name: str) -> None:
merged_path = runtime_name_dir / "merged" merged_path = runtime_name_dir / "merged"
work_inner = runtime_name_dir / "work" / "work" work_inner = runtime_name_dir / "work" / "work"
argv = [ overlay_umount_argv = [
UMOUNT_BIN, UMOUNT_BIN,
# Resolve only if it exists; PRINT_ONLY tests always pre-create it. # Resolve only if it exists; PRINT_ONLY tests always pre-create it.
str(merged_path.resolve(strict=True) if merged_path.exists() else merged_path), str(merged_path.resolve(strict=True) if merged_path.exists() else merged_path),
] ]
# PRINT_ONLY: emit the umount argv and exit. Tests assert exact shape # Collect idmap bind-umount argvs: one per direct subdir of runtime/<name>/idmap/.
# of this dry-run; the post-umount cleanup of work_inner is a runtime idmap_dir = runtime_name_dir / "idmap"
# behaviour exercised on the host, not in unit tests. bind_umount_argvs: list[list[str]] = []
if idmap_dir.is_dir():
for entry in sorted(idmap_dir.iterdir()):
if entry.is_dir():
bind_umount_argvs.append([UMOUNT_BIN, str(entry)])
# PRINT_ONLY: emit the overlay umount argv, then each bind-umount argv, then exit.
# Order matches real execution (overlay first, then idmap binds underneath).
if os.environ.get("LEFT4ME_OVERLAY_PRINT_ONLY") == "1": if os.environ.get("LEFT4ME_OVERLAY_PRINT_ONLY") == "1":
print(" ".join(shlex.quote(a) for a in argv)) _print_argv(overlay_umount_argv)
for bind_umount_argv in bind_umount_argvs:
_print_argv(bind_umount_argv)
sys.exit(0) sys.exit(0)
if merged_path.exists(): if merged_path.exists():
@ -214,7 +323,7 @@ def cmd_umount(name: str) -> None:
# reaped → umount-clears sequence happens without any race # reaped → umount-clears sequence happens without any race
# window for us to ride out. EBUSY here is a real error. # window for us to ride out. EBUSY here is a real error.
if os.path.ismount(merged): if os.path.ismount(merged):
subprocess.run(argv, check=True) subprocess.run(overlay_umount_argv, check=True)
# Kernel-overlayfs creates work_inner during mount with root:root mode # Kernel-overlayfs creates work_inner during mount with root:root mode
# 0/0. After unmount it's an orphan that the unit's User= (left4me) # 0/0. After unmount it's an orphan that the unit's User= (left4me)
@ -227,6 +336,15 @@ def cmd_umount(name: str) -> None:
if work_inner.exists(): if work_inner.exists():
shutil.rmtree(work_inner) shutil.rmtree(work_inner)
# Unwind idmap bind mounts, then remove the idmap directory. Each bind
# is only umounted if it is still a mountpoint (idempotent across partial
# teardowns).
for bind_umount_argv in bind_umount_argvs:
target = Path(bind_umount_argv[-1])
if os.path.ismount(target):
subprocess.run(bind_umount_argv, check=True)
shutil.rmtree(idmap_dir, ignore_errors=True)
def main(argv: list[str]) -> None: def main(argv: list[str]) -> None:
if len(argv) != 3 or argv[1] not in ("mount", "umount"): if len(argv) != 3 or argv[1] not in ("mount", "umount"):

View file

@ -41,6 +41,11 @@ def _run(args: list[str], root: Path, extra_env: dict[str, str] | None = None) -
**os.environ, **os.environ,
"LEFT4ME_ROOT": str(root), "LEFT4ME_ROOT": str(root),
"LEFT4ME_OVERLAY_PRINT_ONLY": "1", "LEFT4ME_OVERLAY_PRINT_ONLY": "1",
# Inject synthetic user ids so tests work without real system users.
"_L4D2_SANDBOX_UID": str(FAKE_SANDBOX_UID),
"_L4D2_SANDBOX_GID": str(FAKE_SANDBOX_GID),
"_LEFT4ME_UID": str(FAKE_LEFT4ME_UID),
"_LEFT4ME_GID": str(FAKE_LEFT4ME_GID),
} }
if extra_env: if extra_env:
env.update(extra_env) env.update(extra_env)
@ -156,6 +161,136 @@ def test_rejects_empty_lowerdir_entry(tmp_path: Path) -> None:
assert "empty entry" in result.stderr assert "empty entry" in result.stderr
FAKE_SANDBOX_UID = 7001
FAKE_SANDBOX_GID = 7001
FAKE_LEFT4ME_UID = 7002
FAKE_LEFT4ME_GID = 7002
def _setup_instance_with_uid(
root: Path,
name: str = "alpha",
lowerdir_uid: int = FAKE_LEFT4ME_UID,
lowerdir_gid: int = FAKE_LEFT4ME_GID,
) -> Path:
"""Like _setup_instance but chowns the lowerdir to a specific uid/gid."""
overlay_dir = root / "overlays" / "workshop"
overlay_dir.mkdir(parents=True, exist_ok=True)
try:
os.chown(overlay_dir, lowerdir_uid, lowerdir_gid)
except PermissionError:
pass # tests not running as root — uid won't match; that's fine for the "skips idmap" test
(root / "installation").mkdir(parents=True, exist_ok=True)
lowerdirs = [str(overlay_dir), str(root / "installation")]
inst_dir = root / "instances" / name
inst_dir.mkdir(parents=True, exist_ok=True)
(inst_dir / "instance.env").write_text(
f"L4D2_LOWERDIRS={':'.join(lowerdirs)}\n"
)
runtime = root / "runtime" / name
(runtime / "upper").mkdir(parents=True, exist_ok=True)
(runtime / "work").mkdir(parents=True, exist_ok=True)
(runtime / "merged").mkdir(parents=True, exist_ok=True)
return overlay_dir
def test_mount_idmaps_sandbox_owned_lowerdir(tmp_path: Path) -> None:
"""A lowerdir owned by l4d2-sandbox uid triggers an idmap bind mount.
The overlay lowerdir= string must reference the idmap path, not the raw
overlay path. A mount --bind --map-users/--map-groups argv must be emitted
before the overlay mount argv.
"""
overlay_dir = _setup_instance_with_uid(
tmp_path, lowerdir_uid=FAKE_SANDBOX_UID, lowerdir_gid=FAKE_SANDBOX_GID
)
try:
os.chown(overlay_dir, FAKE_SANDBOX_UID, FAKE_SANDBOX_GID)
except PermissionError:
pytest.skip("chown requires root — skip on unprivileged runner")
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode == 0, result.stderr
lines = [l for l in result.stdout.splitlines() if l.strip()]
assert len(lines) == 2, f"expected 2 argv lines, got: {result.stdout!r}"
bind_parts = shlex.split(lines[0])
assert bind_parts[0] == "/bin/mount"
assert "--bind" in bind_parts
assert f"--map-users={FAKE_SANDBOX_UID}:{FAKE_LEFT4ME_UID}:1" in bind_parts
assert f"--map-groups={FAKE_SANDBOX_GID}:{FAKE_LEFT4ME_GID}:1" in bind_parts
assert bind_parts[-2] == str(overlay_dir)
idmap_target = str(tmp_path / "runtime" / "alpha" / "idmap" / "workshop")
assert bind_parts[-1] == idmap_target
overlay_parts = shlex.split(lines[1])
assert overlay_parts[0] == "/bin/mount"
assert overlay_parts[1:3] == ["-t", "overlay"]
options = overlay_parts[5]
assert f"lowerdir={idmap_target}:" in options, \
f"lowerdir should start with idmap path; got: {options!r}"
assert str(overlay_dir) not in options, \
f"raw overlay path should not appear in lowerdir; got: {options!r}"
def test_mount_skips_idmap_for_left4me_owned_lowerdir(tmp_path: Path) -> None:
"""A lowerdir already owned by the left4me uid needs no idmap bind mount."""
overlay_dir = _setup_instance_with_uid(
tmp_path, lowerdir_uid=FAKE_LEFT4ME_UID, lowerdir_gid=FAKE_LEFT4ME_GID
)
# Best-effort chown to the left4me uid — skip if not root.
try:
os.chown(overlay_dir, FAKE_LEFT4ME_UID, FAKE_LEFT4ME_GID)
except PermissionError:
# Without root, st_uid is 0 or our own uid; neither matches FAKE_SANDBOX_UID,
# so the helper will correctly skip the idmap bind either way.
pass
result = _run(["mount", "alpha"], tmp_path)
assert result.returncode == 0, result.stderr
lines = [l for l in result.stdout.splitlines() if l.strip()]
assert len(lines) == 1, f"expected 1 argv line (no bind mount), got: {result.stdout!r}"
overlay_parts = shlex.split(lines[0])
assert overlay_parts[0] == "/bin/mount"
assert "--bind" not in overlay_parts
options = overlay_parts[5]
idmap_subdir = str(tmp_path / "runtime" / "alpha" / "idmap")
assert idmap_subdir not in options, f"idmap path should not appear; got: {options!r}"
assert str(overlay_dir) in options
def test_umount_unwinds_idmap_binds(tmp_path: Path) -> None:
"""umount emits bind-umount lines for each idmap subdir, after the overlay umount."""
_setup_instance(tmp_path)
# Pre-seed an idmap subdir as if a previous mount had set it up.
idmap_dir = tmp_path / "runtime" / "alpha" / "idmap"
idmap_dir.mkdir(parents=True)
idmap_sub = idmap_dir / "workshop"
idmap_sub.mkdir()
result = _run(["umount", "alpha"], tmp_path)
assert result.returncode == 0, result.stderr
lines = [l for l in result.stdout.splitlines() if l.strip()]
assert len(lines) >= 2, f"expected at least 2 argv lines, got: {result.stdout!r}"
# First line: overlay umount
overlay_umount_parts = shlex.split(lines[0])
assert overlay_umount_parts == [
"/bin/umount",
str(tmp_path / "runtime" / "alpha" / "merged"),
]
# Subsequent lines: bind umounts for each idmap subdir
bind_umount_parts = shlex.split(lines[1])
assert bind_umount_parts[0] == "/bin/umount"
assert bind_umount_parts[-1] == str(idmap_sub)
@pytest.mark.skipif(sys.platform != "linux", reason="user.* xattrs are Linux-only") @pytest.mark.skipif(sys.platform != "linux", reason="user.* xattrs are Linux-only")
def test_rejects_upperdir_with_fuseoverlayfs_xattr(tmp_path: Path) -> None: def test_rejects_upperdir_with_fuseoverlayfs_xattr(tmp_path: Path) -> None:
_setup_instance(tmp_path) _setup_instance(tmp_path)