"""Filesystem helpers backing the overlay 'Files' section. `safe_resolve_for_listing` and `safe_resolve_for_download` translate a user-supplied sub-path within an overlay into a real filesystem path, rejecting anything that would escape the overlay root (listing) or `LEFT4ME_ROOT` (download). `list_directory` walks one level and shapes the rows the templates render. """ from __future__ import annotations import os from pathlib import Path from l4d2host.paths import get_left4me_root, overlay_path as resolve_overlay_root from l4d2web.services.security import validate_overlay_ref def _is_under(child: Path, parent: Path) -> bool: return child == parent or parent in child.parents def safe_resolve_for_listing(overlay_path_value: str, sub_path: str) -> Path: """Resolve `overlay_root / sub_path` and refuse anything that escapes the overlay root after symlink resolution. Used to render directory listings.""" overlay_root = resolve_overlay_root(overlay_path_value).resolve() if sub_path == "": return overlay_root validate_overlay_ref(sub_path) candidate = (overlay_root / sub_path).resolve(strict=False) if not _is_under(candidate, overlay_root): raise ValueError("path escapes overlay root") return candidate def safe_resolve_for_server_listing(server_id: int, sub_path: str) -> Path | None: """Resolve a path inside `runtime//merged` (the kernel-overlayfs composed view of a running server). Returns None if the merged dir doesn't exist yet (server has never started, or was just reset). Refuses paths that escape the merged root after symlink resolution. """ merged_root = (get_left4me_root() / "runtime" / str(server_id) / "merged").resolve() if not merged_root.is_dir(): return None if sub_path == "": return merged_root validate_overlay_ref(sub_path) candidate = (merged_root / sub_path).resolve(strict=False) if not _is_under(candidate, merged_root): raise ValueError("path escapes server merged root") return candidate DEFAULT_MAX_ENTRIES = 500 def list_directory( target: Path, overlay_root: Path, *, max_entries: int | None = None, ) -> tuple[list[dict], int]: """List one directory level. Returns (entries, truncated_count) where truncated_count is the number of entries elided after the cap. Directories come first, then files; both case-insensitive alphabetical. `max_entries` falls back to module-level `DEFAULT_MAX_ENTRIES` at call time so tests can monkeypatch the cap without re-instantiating routes.""" cap = max_entries if max_entries is not None else DEFAULT_MAX_ENTRIES rows: list[dict] = [] with os.scandir(target) as it: for entry in it: rows.append(_entry_dict(entry, overlay_root)) rows.sort(key=lambda r: (0 if r["kind"] == "dir" else 1, r["name"].casefold())) if len(rows) > cap: truncated = len(rows) - cap rows = rows[:cap] else: truncated = 0 return rows, truncated def _entry_dict(entry: "os.DirEntry[str]", overlay_root: Path) -> dict: is_symlink = entry.is_symlink() broken = False try: is_dir = entry.is_dir(follow_symlinks=True) except OSError: is_dir = False broken = True if is_dir: kind = "dir" size: int | None = None else: kind = "file" if broken: size = None else: try: size = entry.stat(follow_symlinks=True).st_size except OSError: size = None broken = True rel_str = "/".join(Path(entry.path).relative_to(overlay_root).parts) return { "name": entry.name, "rel": rel_str, "kind": kind, "is_symlink": is_symlink, "broken": broken, "size": size, "size_human": _format_size(size) if size is not None else "", } def _format_size(num: int) -> str: if num < 1024: return f"{num} B" units = ["KB", "MB", "GB", "TB"] value = float(num) unit = "B" for u in units: value /= 1024.0 unit = u if value < 1024: break return f"{value:.1f} {unit}" def safe_resolve_for_download(overlay_path_value: str, sub_path: str) -> Path: """Resolve a file path the user wants to download. Allows symlink targets anywhere inside `LEFT4ME_ROOT` (so workshop addons stream from the shared cache) but blocks any escape outside it.""" if sub_path == "": raise ValueError("download requires a file path") validate_overlay_ref(sub_path) overlay_root = resolve_overlay_root(overlay_path_value).resolve() candidate = overlay_root / sub_path real = Path(os.path.realpath(candidate)) left4me_root = get_left4me_root().resolve() if not _is_under(real, left4me_root): raise ValueError("path escapes LEFT4ME_ROOT") return real def safe_resolve_for_server_download(server_id: int, sub_path: str) -> Path | None: """Resolve a file inside `runtime//merged` for download. Follows symlinks (the merged view threads through `installation/` and overlay layers, all under LEFT4ME_ROOT). Refuses anything escaping LEFT4ME_ROOT. Returns None when the merged dir doesn't exist yet.""" if sub_path == "": raise ValueError("download requires a file path") validate_overlay_ref(sub_path) merged_root = (get_left4me_root() / "runtime" / str(server_id) / "merged").resolve() if not merged_root.is_dir(): return None candidate = merged_root / sub_path real = Path(os.path.realpath(candidate)) left4me_root = get_left4me_root().resolve() if not _is_under(real, left4me_root): raise ValueError("path escapes LEFT4ME_ROOT") return real