left4me/l4d2web/services/overlay_files.py
mwiegand c16e780283
feat(l4d2-web): server file tree — enable download symmetric with overlay tree
Adds a /servers/<id>/files/download route mirroring the overlay download
endpoint. Same safety rules: real-path must resolve under LEFT4ME_ROOT
(merged view threads through `installation/` and overlay layers, all
already inside the root). The server file-tree partial now renders
download links.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-09 01:40:04 +02:00

168 lines
5.7 KiB
Python

"""Filesystem helpers backing the overlay 'Files' section.
`safe_resolve_for_listing` and `safe_resolve_for_download` translate a
user-supplied sub-path within an overlay into a real filesystem path,
rejecting anything that would escape the overlay root (listing) or
`LEFT4ME_ROOT` (download). `list_directory` walks one level and shapes
the rows the templates render.
"""
from __future__ import annotations
import os
from pathlib import Path
from l4d2host.paths import get_left4me_root, overlay_path as resolve_overlay_root
from l4d2web.services.security import validate_overlay_ref
def _is_under(child: Path, parent: Path) -> bool:
return child == parent or parent in child.parents
def safe_resolve_for_listing(overlay_path_value: str, sub_path: str) -> Path:
"""Resolve `overlay_root / sub_path` and refuse anything that escapes the
overlay root after symlink resolution. Used to render directory listings."""
overlay_root = resolve_overlay_root(overlay_path_value).resolve()
if sub_path == "":
return overlay_root
validate_overlay_ref(sub_path)
candidate = (overlay_root / sub_path).resolve(strict=False)
if not _is_under(candidate, overlay_root):
raise ValueError("path escapes overlay root")
return candidate
def safe_resolve_for_server_listing(server_id: int, sub_path: str) -> Path | None:
"""Resolve a path inside `runtime/<server_id>/merged` (the kernel-overlayfs
composed view of a running server). Returns None if the merged dir doesn't
exist yet (server has never started, or was just reset). Refuses paths
that escape the merged root after symlink resolution.
"""
merged_root = (get_left4me_root() / "runtime" / str(server_id) / "merged").resolve()
if not merged_root.is_dir():
return None
if sub_path == "":
return merged_root
validate_overlay_ref(sub_path)
candidate = (merged_root / sub_path).resolve(strict=False)
if not _is_under(candidate, merged_root):
raise ValueError("path escapes server merged root")
return candidate
DEFAULT_MAX_ENTRIES = 500
def list_directory(
target: Path,
overlay_root: Path,
*,
max_entries: int | None = None,
) -> tuple[list[dict], int]:
"""List one directory level. Returns (entries, truncated_count) where
truncated_count is the number of entries elided after the cap.
Directories come first, then files; both case-insensitive alphabetical.
`max_entries` falls back to module-level `DEFAULT_MAX_ENTRIES` at call
time so tests can monkeypatch the cap without re-instantiating routes."""
cap = max_entries if max_entries is not None else DEFAULT_MAX_ENTRIES
rows: list[dict] = []
with os.scandir(target) as it:
for entry in it:
rows.append(_entry_dict(entry, overlay_root))
rows.sort(key=lambda r: (0 if r["kind"] == "dir" else 1, r["name"].casefold()))
if len(rows) > cap:
truncated = len(rows) - cap
rows = rows[:cap]
else:
truncated = 0
return rows, truncated
def _entry_dict(entry: "os.DirEntry[str]", overlay_root: Path) -> dict:
is_symlink = entry.is_symlink()
broken = False
try:
is_dir = entry.is_dir(follow_symlinks=True)
except OSError:
is_dir = False
broken = True
if is_dir:
kind = "dir"
size: int | None = None
else:
kind = "file"
if broken:
size = None
else:
try:
size = entry.stat(follow_symlinks=True).st_size
except OSError:
size = None
broken = True
rel_str = "/".join(Path(entry.path).relative_to(overlay_root).parts)
return {
"name": entry.name,
"rel": rel_str,
"kind": kind,
"is_symlink": is_symlink,
"broken": broken,
"size": size,
"size_human": _format_size(size) if size is not None else "",
}
def _format_size(num: int) -> str:
if num < 1024:
return f"{num} B"
units = ["KB", "MB", "GB", "TB"]
value = float(num)
unit = "B"
for u in units:
value /= 1024.0
unit = u
if value < 1024:
break
return f"{value:.1f} {unit}"
def safe_resolve_for_download(overlay_path_value: str, sub_path: str) -> Path:
"""Resolve a file path the user wants to download. Allows symlink targets
anywhere inside `LEFT4ME_ROOT` (so workshop addons stream from the shared
cache) but blocks any escape outside it."""
if sub_path == "":
raise ValueError("download requires a file path")
validate_overlay_ref(sub_path)
overlay_root = resolve_overlay_root(overlay_path_value).resolve()
candidate = overlay_root / sub_path
real = Path(os.path.realpath(candidate))
left4me_root = get_left4me_root().resolve()
if not _is_under(real, left4me_root):
raise ValueError("path escapes LEFT4ME_ROOT")
return real
def safe_resolve_for_server_download(server_id: int, sub_path: str) -> Path | None:
"""Resolve a file inside `runtime/<server_id>/merged` for download. Follows
symlinks (the merged view threads through `installation/` and overlay
layers, all under LEFT4ME_ROOT). Refuses anything escaping LEFT4ME_ROOT.
Returns None when the merged dir doesn't exist yet."""
if sub_path == "":
raise ValueError("download requires a file path")
validate_overlay_ref(sub_path)
merged_root = (get_left4me_root() / "runtime" / str(server_id) / "merged").resolve()
if not merged_root.is_dir():
return None
candidate = merged_root / sub_path
real = Path(os.path.realpath(candidate))
left4me_root = get_left4me_root().resolve()
if not _is_under(real, left4me_root):
raise ValueError("path escapes LEFT4ME_ROOT")
return real