"""Filesystem helpers backing the overlay 'Files' section. `safe_resolve_for_listing` and `safe_resolve_for_download` translate a user-supplied sub-path within an overlay into a real filesystem path, rejecting anything that would escape the overlay root (listing) or `LEFT4ME_ROOT` (download). `list_directory` walks one level and shapes the rows the templates render. """ from __future__ import annotations import os from pathlib import Path from l4d2host.paths import get_left4me_root, overlay_path as resolve_overlay_root from l4d2web.services.security import validate_overlay_ref def _is_under(child: Path, parent: Path) -> bool: return child == parent or parent in child.parents def safe_resolve_for_listing(overlay_path_value: str, sub_path: str) -> Path: """Resolve `overlay_root / sub_path` and refuse anything that escapes the overlay root after symlink resolution. Used to render directory listings.""" overlay_root = resolve_overlay_root(overlay_path_value).resolve() if sub_path == "": return overlay_root validate_overlay_ref(sub_path) candidate = (overlay_root / sub_path).resolve(strict=False) if not _is_under(candidate, overlay_root): raise ValueError("path escapes overlay root") return candidate def safe_resolve_for_write(overlay_path_value: str, sub_path: str) -> Path: """Resolve a destination path for create / write / replace operations. Refuses empty `sub_path`, escape via `..` or symlink target, refuses to overwrite an existing symlink, and refuses a path whose parent resolves to a non-directory. Returns the lexical (un-resolved) candidate so the caller can write through to the user-visible path; the safety check is done against the resolved variant. """ if sub_path == "": raise ValueError("write requires a sub-path") validate_overlay_ref(sub_path) overlay_root = resolve_overlay_root(overlay_path_value).resolve() candidate = overlay_root / sub_path resolved_candidate = candidate.resolve(strict=False) if not _is_under(resolved_candidate, overlay_root): raise ValueError("path escapes overlay root") if candidate.is_symlink(): raise ValueError("refusing to overwrite an existing symlink") parent = candidate.parent if parent.exists() and not parent.is_dir(): raise ValueError("parent path is not a directory") return candidate def safe_resolve_for_delete(overlay_path_value: str, sub_path: str) -> Path: """Resolve a path the user wants to delete. Same root-escape rules as `safe_resolve_for_listing`. Allows files and (empty) directories; the caller is responsible for refusing recursive directory removal.""" if sub_path == "": raise ValueError("delete requires a sub-path") validate_overlay_ref(sub_path) overlay_root = resolve_overlay_root(overlay_path_value).resolve() candidate = overlay_root / sub_path resolved_candidate = candidate.resolve(strict=False) if not _is_under(resolved_candidate, overlay_root): raise ValueError("path escapes overlay root") return candidate def safe_resolve_for_move( overlay_path_value: str, src: str, dst: str ) -> tuple[Path, Path]: """Resolve src + dst for a rename/move. Both must be inside the overlay root. Refuses cycle (dst inside src), missing src, missing/non-dir parent of dst, and overwriting a symlink at dst.""" if src == "" or dst == "": raise ValueError("move requires non-empty src and dst") validate_overlay_ref(src) validate_overlay_ref(dst) overlay_root = resolve_overlay_root(overlay_path_value).resolve() src_path = overlay_root / src dst_path = overlay_root / dst src_resolved = src_path.resolve(strict=False) dst_resolved = dst_path.resolve(strict=False) if not _is_under(src_resolved, overlay_root): raise ValueError("src escapes overlay root") if not _is_under(dst_resolved, overlay_root): raise ValueError("dst escapes overlay root") if not src_path.exists() and not src_path.is_symlink(): raise ValueError("src does not exist") if dst_path.is_symlink(): raise ValueError("refusing to overwrite an existing symlink at dst") dst_parent = dst_path.parent if dst_parent.exists() and not dst_parent.is_dir(): raise ValueError("dst parent is not a directory") if src_path.is_dir(): # Cycle check: dst must not be src or any descendant of src. if dst_resolved == src_resolved or src_resolved in dst_resolved.parents: raise ValueError("cannot move a directory into itself") return src_path, dst_path _EDITABLE_MAX_BYTES = 1 * 1024 * 1024 _UTF8_SNIFF_BYTES = 8 * 1024 def is_editable(path: Path) -> bool: """True iff the file is a regular file (not symlink), ≤ 1 MiB, the first 8 KiB decodes as strict UTF-8, and contains no NUL bytes. NULs decode as valid UTF-8 (U+0000) but render badly in textareas and almost always indicate binary content; rejecting them matches the "is this a text file" intuition users expect. Save endpoint enforces the same rules.""" try: if path.is_symlink(): return False if not path.is_file(): return False size = path.stat().st_size except OSError: return False if size > _EDITABLE_MAX_BYTES: return False try: with path.open("rb") as f: sample = f.read(_UTF8_SNIFF_BYTES) except OSError: return False if b"\x00" in sample: return False try: sample.decode("utf-8", errors="strict") except UnicodeDecodeError: return False return True def safe_resolve_for_server_listing(server_id: int, sub_path: str) -> Path | None: """Resolve a path inside `runtime//merged` (the kernel-overlayfs composed view of a running server). Returns None if the merged dir doesn't exist yet (server has never started, or was just reset). Refuses paths that escape the merged root after symlink resolution. """ merged_root = (get_left4me_root() / "runtime" / str(server_id) / "merged").resolve() if not merged_root.is_dir(): return None if sub_path == "": return merged_root validate_overlay_ref(sub_path) candidate = (merged_root / sub_path).resolve(strict=False) if not _is_under(candidate, merged_root): raise ValueError("path escapes server merged root") return candidate DEFAULT_MAX_ENTRIES = 500 def list_directory( target: Path, overlay_root: Path, *, max_entries: int | None = None, ) -> tuple[list[dict], int]: """List one directory level. Returns (entries, truncated_count) where truncated_count is the number of entries elided after the cap. Directories come first, then files; both case-insensitive alphabetical. `max_entries` falls back to module-level `DEFAULT_MAX_ENTRIES` at call time so tests can monkeypatch the cap without re-instantiating routes.""" cap = max_entries if max_entries is not None else DEFAULT_MAX_ENTRIES rows: list[dict] = [] with os.scandir(target) as it: for entry in it: rows.append(_entry_dict(entry, overlay_root)) rows.sort(key=lambda r: (0 if r["kind"] == "dir" else 1, r["name"].casefold())) if len(rows) > cap: truncated = len(rows) - cap rows = rows[:cap] else: truncated = 0 return rows, truncated def _entry_dict(entry: "os.DirEntry[str]", overlay_root: Path) -> dict: is_symlink = entry.is_symlink() broken = False try: is_dir = entry.is_dir(follow_symlinks=True) except OSError: is_dir = False broken = True if is_dir: kind = "dir" size: int | None = None else: kind = "file" if broken: size = None else: try: size = entry.stat(follow_symlinks=True).st_size except OSError: size = None broken = True rel_str = "/".join(Path(entry.path).relative_to(overlay_root).parts) if kind == "file" and not broken: editable = is_editable(Path(entry.path)) else: editable = False return { "name": entry.name, "rel": rel_str, "kind": kind, "is_symlink": is_symlink, "broken": broken, "size": size, "size_human": _format_size(size) if size is not None else "", "editable": editable, } def _format_size(num: int) -> str: if num < 1024: return f"{num} B" units = ["KB", "MB", "GB", "TB"] value = float(num) unit = "B" for u in units: value /= 1024.0 unit = u if value < 1024: break return f"{value:.1f} {unit}" def safe_resolve_for_download(overlay_path_value: str, sub_path: str) -> Path: """Resolve a file path the user wants to download. Allows symlink targets anywhere inside `LEFT4ME_ROOT` (so workshop addons stream from the shared cache) but blocks any escape outside it.""" if sub_path == "": raise ValueError("download requires a file path") validate_overlay_ref(sub_path) overlay_root = resolve_overlay_root(overlay_path_value).resolve() candidate = overlay_root / sub_path real = Path(os.path.realpath(candidate)) left4me_root = get_left4me_root().resolve() if not _is_under(real, left4me_root): raise ValueError("path escapes LEFT4ME_ROOT") return real def safe_resolve_for_server_download(server_id: int, sub_path: str) -> Path | None: """Resolve a file inside `runtime//merged` for download. Follows symlinks (the merged view threads through `installation/` and overlay layers, all under LEFT4ME_ROOT). Refuses anything escaping LEFT4ME_ROOT. Returns None when the merged dir doesn't exist yet.""" if sub_path == "": raise ValueError("download requires a file path") validate_overlay_ref(sub_path) merged_root = (get_left4me_root() / "runtime" / str(server_id) / "merged").resolve() if not merged_root.is_dir(): return None candidate = merged_root / sub_path real = Path(os.path.realpath(candidate)) left4me_root = get_left4me_root().resolve() if not _is_under(real, left4me_root): raise ValueError("path escapes LEFT4ME_ROOT") return real