diff --git a/deploy/files/usr/local/libexec/left4me/left4me-overlay b/deploy/files/usr/local/libexec/left4me/left4me-overlay index ccbb4c0..e52c798 100644 --- a/deploy/files/usr/local/libexec/left4me/left4me-overlay +++ b/deploy/files/usr/local/libexec/left4me/left4me-overlay @@ -29,6 +29,7 @@ shell-quoted) and exit 0 instead of execv. Used by tests. """ import os +import pwd import re import shlex import shutil @@ -54,6 +55,45 @@ def die(msg: str) -> None: sys.exit(1) +def _lookup_uid(username: str) -> tuple[int, int]: + """Return (uid, gid) for *username*, dying with a clear message if missing.""" + try: + entry = pwd.getpwnam(username) + except KeyError: + die( + f"required system user {username!r} does not exist; " + "this is a deploy misconfiguration" + ) + return entry.pw_uid, entry.pw_gid + + +def _get_user_ids() -> tuple[int, int, int, int]: + """Return (sandbox_uid, sandbox_gid, left4me_uid, left4me_gid). + + In normal operation, looks up the real system users. When the test-only + env vars _L4D2_SANDBOX_UID/_L4D2_SANDBOX_GID/_LEFT4ME_UID/_LEFT4ME_GID + are set, those values are used directly so tests can run without root + and without real system users being present. + """ + sandbox_uid_env = os.environ.get("_L4D2_SANDBOX_UID") + sandbox_gid_env = os.environ.get("_L4D2_SANDBOX_GID") + left4me_uid_env = os.environ.get("_LEFT4ME_UID") + left4me_gid_env = os.environ.get("_LEFT4ME_GID") + + if all(v is not None for v in (sandbox_uid_env, sandbox_gid_env, + left4me_uid_env, left4me_gid_env)): + return ( + int(sandbox_uid_env), # type: ignore[arg-type] + int(sandbox_gid_env), # type: ignore[arg-type] + int(left4me_uid_env), # type: ignore[arg-type] + int(left4me_gid_env), # type: ignore[arg-type] + ) + + sandbox_uid, sandbox_gid = _lookup_uid("l4d2-sandbox") + left4me_uid, left4me_gid = _lookup_uid("left4me") + return sandbox_uid, sandbox_gid, left4me_uid, left4me_gid + + def root() -> Path: return Path(os.environ.get("LEFT4ME_ROOT") or DEFAULT_ROOT) @@ -128,9 +168,14 @@ def assert_no_fuse_xattrs(upper: Path) -> None: ) +def _print_argv(argv: list[str]) -> None: + """Emit one shell-quoted argv line to stdout (PRINT_ONLY helper, no exit).""" + print(" ".join(shlex.quote(a) for a in argv)) + + def exec_or_print(argv: list[str]) -> None: if os.environ.get("LEFT4ME_OVERLAY_PRINT_ONLY") == "1": - print(" ".join(shlex.quote(a) for a in argv)) + _print_argv(argv) sys.exit(0) os.execv(argv[0], argv) @@ -168,7 +213,62 @@ def cmd_mount(name: str) -> None: assert_no_fuse_xattrs(upper) - options = f"lowerdir={':'.join(canonical_lowerdirs)},upperdir={upper},workdir={work}" + # Resolve user ids now (fails fast on deploy misconfiguration). + sandbox_uid, sandbox_gid, left4me_uid, left4me_gid = _get_user_ids() + + # Build the final lowerdir list, substituting idmap bind-mount paths for + # any lowerdir owned by l4d2-sandbox. An idmap bind mount makes the kernel + # see the l4d2-sandbox-owned tree as if it were owned by left4me, so that + # overlayfs copy-up produces left4me-owned upperdir entries. + idmap_dir = runtime_name_dir / "idmap" + final_lowerdirs: list[str] = [] + bind_argvs: list[list[str]] = [] + + for lowerdir in canonical_lowerdirs: + st = os.stat(lowerdir) + if st.st_uid == sandbox_uid: + # This lowerdir needs idmap remapping. + overlay_id = Path(lowerdir).name + idmap_target = idmap_dir / overlay_id + + if os.environ.get("LEFT4ME_OVERLAY_PRINT_ONLY") != "1": + idmap_dir.mkdir(mode=0o700, exist_ok=True) + idmap_target.mkdir(mode=0o700, exist_ok=True) + + if not os.path.ismount(idmap_target) or \ + os.environ.get("LEFT4ME_OVERLAY_PRINT_ONLY") == "1": + # --map-users / --map-groups argument format: + # :: + # The util-linux man page calls these :, which is + # misleading. Empirically (verified on left4.me, kernel 6.12, + # ext4) the FIRST number is the on-disk uid and the SECOND is + # the uid exposed inside the mount. Don't swap them. + bind_argv = [ + MOUNT_BIN, + "--bind", + f"--map-users={sandbox_uid}:{left4me_uid}:1", + f"--map-groups={sandbox_gid}:{left4me_gid}:1", + lowerdir, + str(idmap_target), + ] + bind_argvs.append(bind_argv) + + final_lowerdirs.append(str(idmap_target)) + else: + final_lowerdirs.append(lowerdir) + + print_only = os.environ.get("LEFT4ME_OVERLAY_PRINT_ONLY") == "1" + + if print_only: + # Emit each bind-mount argv first, then fall through to the overlay argv. + for bind_argv in bind_argvs: + _print_argv(bind_argv) + else: + # Actually exec each bind mount before the overlay mount. + for bind_argv in bind_argvs: + subprocess.run(bind_argv, check=True) + + options = f"lowerdir={':'.join(final_lowerdirs)},upperdir={upper},workdir={work}" argv = [ MOUNT_BIN, "-t", "overlay", @@ -186,17 +286,26 @@ def cmd_umount(name: str) -> None: merged_path = runtime_name_dir / "merged" work_inner = runtime_name_dir / "work" / "work" - argv = [ + overlay_umount_argv = [ UMOUNT_BIN, # Resolve only if it exists; PRINT_ONLY tests always pre-create it. str(merged_path.resolve(strict=True) if merged_path.exists() else merged_path), ] - # PRINT_ONLY: emit the umount argv and exit. Tests assert exact shape - # of this dry-run; the post-umount cleanup of work_inner is a runtime - # behaviour exercised on the host, not in unit tests. + # Collect idmap bind-umount argvs: one per direct subdir of runtime//idmap/. + idmap_dir = runtime_name_dir / "idmap" + bind_umount_argvs: list[list[str]] = [] + if idmap_dir.is_dir(): + for entry in sorted(idmap_dir.iterdir()): + if entry.is_dir(): + bind_umount_argvs.append([UMOUNT_BIN, str(entry)]) + + # PRINT_ONLY: emit the overlay umount argv, then each bind-umount argv, then exit. + # Order matches real execution (overlay first, then idmap binds underneath). if os.environ.get("LEFT4ME_OVERLAY_PRINT_ONLY") == "1": - print(" ".join(shlex.quote(a) for a in argv)) + _print_argv(overlay_umount_argv) + for bind_umount_argv in bind_umount_argvs: + _print_argv(bind_umount_argv) sys.exit(0) if merged_path.exists(): @@ -214,7 +323,7 @@ def cmd_umount(name: str) -> None: # reaped → umount-clears sequence happens without any race # window for us to ride out. EBUSY here is a real error. if os.path.ismount(merged): - subprocess.run(argv, check=True) + subprocess.run(overlay_umount_argv, check=True) # Kernel-overlayfs creates work_inner during mount with root:root mode # 0/0. After unmount it's an orphan that the unit's User= (left4me) @@ -227,6 +336,15 @@ def cmd_umount(name: str) -> None: if work_inner.exists(): shutil.rmtree(work_inner) + # Unwind idmap bind mounts, then remove the idmap directory. Each bind + # is only umounted if it is still a mountpoint (idempotent across partial + # teardowns). + for bind_umount_argv in bind_umount_argvs: + target = Path(bind_umount_argv[-1]) + if os.path.ismount(target): + subprocess.run(bind_umount_argv, check=True) + shutil.rmtree(idmap_dir, ignore_errors=True) + def main(argv: list[str]) -> None: if len(argv) != 3 or argv[1] not in ("mount", "umount"): diff --git a/l4d2host/tests/test_overlay_helper.py b/l4d2host/tests/test_overlay_helper.py index a098daf..b97381d 100644 --- a/l4d2host/tests/test_overlay_helper.py +++ b/l4d2host/tests/test_overlay_helper.py @@ -41,6 +41,11 @@ def _run(args: list[str], root: Path, extra_env: dict[str, str] | None = None) - **os.environ, "LEFT4ME_ROOT": str(root), "LEFT4ME_OVERLAY_PRINT_ONLY": "1", + # Inject synthetic user ids so tests work without real system users. + "_L4D2_SANDBOX_UID": str(FAKE_SANDBOX_UID), + "_L4D2_SANDBOX_GID": str(FAKE_SANDBOX_GID), + "_LEFT4ME_UID": str(FAKE_LEFT4ME_UID), + "_LEFT4ME_GID": str(FAKE_LEFT4ME_GID), } if extra_env: env.update(extra_env) @@ -156,6 +161,136 @@ def test_rejects_empty_lowerdir_entry(tmp_path: Path) -> None: assert "empty entry" in result.stderr +FAKE_SANDBOX_UID = 7001 +FAKE_SANDBOX_GID = 7001 +FAKE_LEFT4ME_UID = 7002 +FAKE_LEFT4ME_GID = 7002 + + +def _setup_instance_with_uid( + root: Path, + name: str = "alpha", + lowerdir_uid: int = FAKE_LEFT4ME_UID, + lowerdir_gid: int = FAKE_LEFT4ME_GID, +) -> Path: + """Like _setup_instance but chowns the lowerdir to a specific uid/gid.""" + overlay_dir = root / "overlays" / "workshop" + overlay_dir.mkdir(parents=True, exist_ok=True) + try: + os.chown(overlay_dir, lowerdir_uid, lowerdir_gid) + except PermissionError: + pass # tests not running as root — uid won't match; that's fine for the "skips idmap" test + (root / "installation").mkdir(parents=True, exist_ok=True) + lowerdirs = [str(overlay_dir), str(root / "installation")] + inst_dir = root / "instances" / name + inst_dir.mkdir(parents=True, exist_ok=True) + (inst_dir / "instance.env").write_text( + f"L4D2_LOWERDIRS={':'.join(lowerdirs)}\n" + ) + runtime = root / "runtime" / name + (runtime / "upper").mkdir(parents=True, exist_ok=True) + (runtime / "work").mkdir(parents=True, exist_ok=True) + (runtime / "merged").mkdir(parents=True, exist_ok=True) + return overlay_dir + + + +def test_mount_idmaps_sandbox_owned_lowerdir(tmp_path: Path) -> None: + """A lowerdir owned by l4d2-sandbox uid triggers an idmap bind mount. + + The overlay lowerdir= string must reference the idmap path, not the raw + overlay path. A mount --bind --map-users/--map-groups argv must be emitted + before the overlay mount argv. + """ + overlay_dir = _setup_instance_with_uid( + tmp_path, lowerdir_uid=FAKE_SANDBOX_UID, lowerdir_gid=FAKE_SANDBOX_GID + ) + try: + os.chown(overlay_dir, FAKE_SANDBOX_UID, FAKE_SANDBOX_GID) + except PermissionError: + pytest.skip("chown requires root — skip on unprivileged runner") + + result = _run(["mount", "alpha"], tmp_path) + assert result.returncode == 0, result.stderr + + lines = [l for l in result.stdout.splitlines() if l.strip()] + assert len(lines) == 2, f"expected 2 argv lines, got: {result.stdout!r}" + + bind_parts = shlex.split(lines[0]) + assert bind_parts[0] == "/bin/mount" + assert "--bind" in bind_parts + assert f"--map-users={FAKE_SANDBOX_UID}:{FAKE_LEFT4ME_UID}:1" in bind_parts + assert f"--map-groups={FAKE_SANDBOX_GID}:{FAKE_LEFT4ME_GID}:1" in bind_parts + assert bind_parts[-2] == str(overlay_dir) + idmap_target = str(tmp_path / "runtime" / "alpha" / "idmap" / "workshop") + assert bind_parts[-1] == idmap_target + + overlay_parts = shlex.split(lines[1]) + assert overlay_parts[0] == "/bin/mount" + assert overlay_parts[1:3] == ["-t", "overlay"] + options = overlay_parts[5] + assert f"lowerdir={idmap_target}:" in options, \ + f"lowerdir should start with idmap path; got: {options!r}" + assert str(overlay_dir) not in options, \ + f"raw overlay path should not appear in lowerdir; got: {options!r}" + + +def test_mount_skips_idmap_for_left4me_owned_lowerdir(tmp_path: Path) -> None: + """A lowerdir already owned by the left4me uid needs no idmap bind mount.""" + overlay_dir = _setup_instance_with_uid( + tmp_path, lowerdir_uid=FAKE_LEFT4ME_UID, lowerdir_gid=FAKE_LEFT4ME_GID + ) + # Best-effort chown to the left4me uid — skip if not root. + try: + os.chown(overlay_dir, FAKE_LEFT4ME_UID, FAKE_LEFT4ME_GID) + except PermissionError: + # Without root, st_uid is 0 or our own uid; neither matches FAKE_SANDBOX_UID, + # so the helper will correctly skip the idmap bind either way. + pass + + result = _run(["mount", "alpha"], tmp_path) + assert result.returncode == 0, result.stderr + + lines = [l for l in result.stdout.splitlines() if l.strip()] + assert len(lines) == 1, f"expected 1 argv line (no bind mount), got: {result.stdout!r}" + + overlay_parts = shlex.split(lines[0]) + assert overlay_parts[0] == "/bin/mount" + assert "--bind" not in overlay_parts + options = overlay_parts[5] + idmap_subdir = str(tmp_path / "runtime" / "alpha" / "idmap") + assert idmap_subdir not in options, f"idmap path should not appear; got: {options!r}" + assert str(overlay_dir) in options + + +def test_umount_unwinds_idmap_binds(tmp_path: Path) -> None: + """umount emits bind-umount lines for each idmap subdir, after the overlay umount.""" + _setup_instance(tmp_path) + # Pre-seed an idmap subdir as if a previous mount had set it up. + idmap_dir = tmp_path / "runtime" / "alpha" / "idmap" + idmap_dir.mkdir(parents=True) + idmap_sub = idmap_dir / "workshop" + idmap_sub.mkdir() + + result = _run(["umount", "alpha"], tmp_path) + assert result.returncode == 0, result.stderr + + lines = [l for l in result.stdout.splitlines() if l.strip()] + assert len(lines) >= 2, f"expected at least 2 argv lines, got: {result.stdout!r}" + + # First line: overlay umount + overlay_umount_parts = shlex.split(lines[0]) + assert overlay_umount_parts == [ + "/bin/umount", + str(tmp_path / "runtime" / "alpha" / "merged"), + ] + + # Subsequent lines: bind umounts for each idmap subdir + bind_umount_parts = shlex.split(lines[1]) + assert bind_umount_parts[0] == "/bin/umount" + assert bind_umount_parts[-1] == str(idmap_sub) + + @pytest.mark.skipif(sys.platform != "linux", reason="user.* xattrs are Linux-only") def test_rejects_upperdir_with_fuseoverlayfs_xattr(tmp_path: Path) -> None: _setup_instance(tmp_path)