left4me/l4d2web/services/overlay_files.py
mwiegand ed12280cf0
feat(l4d2-web): server detail — directory tree of the runtime merged view
Adds a Files section at the bottom of the server detail page that lists
the kernel-overlayfs merged view at runtime/<server_id>/merged/. Reuses
the overlay file-tree partial via two new template variables:

- files_base_url: parent passes "/overlays/<id>" or "/servers/<id>"
- download_supported: false for servers (runtime holds large game
  binaries; no download endpoint), true for overlays (existing behavior)

New service helper safe_resolve_for_server_listing() rejects path
traversal beyond the merged root and returns None when the overlayfs
mount doesn't exist (server never started or just reset).

New route GET /servers/<id>/files?path=<rel> returns the lazy-load
file-tree fragment, gated to the server owner. No download counterpart.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-09 01:35:09 +02:00

149 lines
4.9 KiB
Python

"""Filesystem helpers backing the overlay 'Files' section.
`safe_resolve_for_listing` and `safe_resolve_for_download` translate a
user-supplied sub-path within an overlay into a real filesystem path,
rejecting anything that would escape the overlay root (listing) or
`LEFT4ME_ROOT` (download). `list_directory` walks one level and shapes
the rows the templates render.
"""
from __future__ import annotations
import os
from pathlib import Path
from l4d2host.paths import get_left4me_root, overlay_path as resolve_overlay_root
from l4d2web.services.security import validate_overlay_ref
def _is_under(child: Path, parent: Path) -> bool:
return child == parent or parent in child.parents
def safe_resolve_for_listing(overlay_path_value: str, sub_path: str) -> Path:
"""Resolve `overlay_root / sub_path` and refuse anything that escapes the
overlay root after symlink resolution. Used to render directory listings."""
overlay_root = resolve_overlay_root(overlay_path_value).resolve()
if sub_path == "":
return overlay_root
validate_overlay_ref(sub_path)
candidate = (overlay_root / sub_path).resolve(strict=False)
if not _is_under(candidate, overlay_root):
raise ValueError("path escapes overlay root")
return candidate
def safe_resolve_for_server_listing(server_id: int, sub_path: str) -> Path | None:
"""Resolve a path inside `runtime/<server_id>/merged` (the kernel-overlayfs
composed view of a running server). Returns None if the merged dir doesn't
exist yet (server has never started, or was just reset). Refuses paths
that escape the merged root after symlink resolution.
"""
merged_root = (get_left4me_root() / "runtime" / str(server_id) / "merged").resolve()
if not merged_root.is_dir():
return None
if sub_path == "":
return merged_root
validate_overlay_ref(sub_path)
candidate = (merged_root / sub_path).resolve(strict=False)
if not _is_under(candidate, merged_root):
raise ValueError("path escapes server merged root")
return candidate
DEFAULT_MAX_ENTRIES = 500
def list_directory(
target: Path,
overlay_root: Path,
*,
max_entries: int | None = None,
) -> tuple[list[dict], int]:
"""List one directory level. Returns (entries, truncated_count) where
truncated_count is the number of entries elided after the cap.
Directories come first, then files; both case-insensitive alphabetical.
`max_entries` falls back to module-level `DEFAULT_MAX_ENTRIES` at call
time so tests can monkeypatch the cap without re-instantiating routes."""
cap = max_entries if max_entries is not None else DEFAULT_MAX_ENTRIES
rows: list[dict] = []
with os.scandir(target) as it:
for entry in it:
rows.append(_entry_dict(entry, overlay_root))
rows.sort(key=lambda r: (0 if r["kind"] == "dir" else 1, r["name"].casefold()))
if len(rows) > cap:
truncated = len(rows) - cap
rows = rows[:cap]
else:
truncated = 0
return rows, truncated
def _entry_dict(entry: "os.DirEntry[str]", overlay_root: Path) -> dict:
is_symlink = entry.is_symlink()
broken = False
try:
is_dir = entry.is_dir(follow_symlinks=True)
except OSError:
is_dir = False
broken = True
if is_dir:
kind = "dir"
size: int | None = None
else:
kind = "file"
if broken:
size = None
else:
try:
size = entry.stat(follow_symlinks=True).st_size
except OSError:
size = None
broken = True
rel_str = "/".join(Path(entry.path).relative_to(overlay_root).parts)
return {
"name": entry.name,
"rel": rel_str,
"kind": kind,
"is_symlink": is_symlink,
"broken": broken,
"size": size,
"size_human": _format_size(size) if size is not None else "",
}
def _format_size(num: int) -> str:
if num < 1024:
return f"{num} B"
units = ["KB", "MB", "GB", "TB"]
value = float(num)
unit = "B"
for u in units:
value /= 1024.0
unit = u
if value < 1024:
break
return f"{value:.1f} {unit}"
def safe_resolve_for_download(overlay_path_value: str, sub_path: str) -> Path:
"""Resolve a file path the user wants to download. Allows symlink targets
anywhere inside `LEFT4ME_ROOT` (so workshop addons stream from the shared
cache) but blocks any escape outside it."""
if sub_path == "":
raise ValueError("download requires a file path")
validate_overlay_ref(sub_path)
overlay_root = resolve_overlay_root(overlay_path_value).resolve()
candidate = overlay_root / sub_path
real = Path(os.path.realpath(candidate))
left4me_root = get_left4me_root().resolve()
if not _is_under(real, left4me_root):
raise ValueError("path escapes LEFT4ME_ROOT")
return real