left4me/l4d2web/l4d2web/services/overlay_files.py
mwiegand 49992b3a26
refactor(repo): uv workspace + hatchling + layout restructure
Migrate from pip-install-e + setuptools to a uv workspace with a
committed uv.lock for deterministic deps. Switch both members to
hatchling, and move package sources into nested standard layout
(l4d2host/l4d2host/, l4d2web/l4d2web/) so builds work from a
read-only source tree — setuptools wrote egg-info to source under
the old layout, which broke uv sync on the root-owned /opt/left4me/src.

Local dev install: `pip install -e ./l4d2host -e ./l4d2web` -> `uv sync`.
.envrc switches from `layout python python3.13` to `use uv`. Python
pinned to 3.13 via .python-version.

l4d2web now declares its cross-dep on l4d2host explicitly via
[tool.uv.sources] (workspace = true). l4d2web/alembic.ini and
l4d2web/alembic/ stay at the project root (standard alembic layout).

Test fixes:
- tests/__init__.py added to both test dirs so pytest doesn't shadow
  l4d2host as a namespace package via outer-dir walk.
- 3 CWD-relative paths in tests (l4d2web/static/css/{tokens,layout}.css
  and js/sse.js) anchored to Path(__file__) so they survive layout
  changes.
- Two test_install.py tests now monkeypatch HOME to tmp_path so they
  stop silently mutating ~/.steam/sdk32 on every run.

628 tests pass under sandboxed `uv run pytest`.

Per docs/superpowers/plans/2026-05-15-uv-workspace-execution.md;
prereq for the ckn-bw bundle's uv-sync action (queued).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 22:04:29 +02:00

281 lines
10 KiB
Python

"""Filesystem helpers backing the overlay 'Files' section.
`safe_resolve_for_listing` and `safe_resolve_for_download` translate a
user-supplied sub-path within an overlay into a real filesystem path,
rejecting anything that would escape the overlay root (listing) or
`LEFT4ME_ROOT` (download). `list_directory` walks one level and shapes
the rows the templates render.
"""
from __future__ import annotations
import os
from pathlib import Path
from l4d2host.paths import get_left4me_root, overlay_path as resolve_overlay_root
from l4d2web.services.security import validate_overlay_ref
def _is_under(child: Path, parent: Path) -> bool:
return child == parent or parent in child.parents
def safe_resolve_for_listing(overlay_path_value: str, sub_path: str) -> Path:
"""Resolve `overlay_root / sub_path` and refuse anything that escapes the
overlay root after symlink resolution. Used to render directory listings."""
overlay_root = resolve_overlay_root(overlay_path_value).resolve()
if sub_path == "":
return overlay_root
validate_overlay_ref(sub_path)
candidate = (overlay_root / sub_path).resolve(strict=False)
if not _is_under(candidate, overlay_root):
raise ValueError("path escapes overlay root")
return candidate
def safe_resolve_for_write(overlay_path_value: str, sub_path: str) -> Path:
"""Resolve a destination path for create / write / replace operations.
Refuses empty `sub_path`, escape via `..` or symlink target, refuses to
overwrite an existing symlink, and refuses a path whose parent resolves
to a non-directory. Returns the lexical (un-resolved) candidate so the
caller can write through to the user-visible path; the safety check is
done against the resolved variant.
"""
if sub_path == "":
raise ValueError("write requires a sub-path")
validate_overlay_ref(sub_path)
overlay_root = resolve_overlay_root(overlay_path_value).resolve()
candidate = overlay_root / sub_path
resolved_candidate = candidate.resolve(strict=False)
if not _is_under(resolved_candidate, overlay_root):
raise ValueError("path escapes overlay root")
if candidate.is_symlink():
raise ValueError("refusing to overwrite an existing symlink")
parent = candidate.parent
if parent.exists() and not parent.is_dir():
raise ValueError("parent path is not a directory")
return candidate
def safe_resolve_for_delete(overlay_path_value: str, sub_path: str) -> Path:
"""Resolve a path the user wants to delete. Same root-escape rules as
`safe_resolve_for_listing`. Allows files and (empty) directories; the
caller is responsible for refusing recursive directory removal."""
if sub_path == "":
raise ValueError("delete requires a sub-path")
validate_overlay_ref(sub_path)
overlay_root = resolve_overlay_root(overlay_path_value).resolve()
candidate = overlay_root / sub_path
resolved_candidate = candidate.resolve(strict=False)
if not _is_under(resolved_candidate, overlay_root):
raise ValueError("path escapes overlay root")
return candidate
def safe_resolve_for_move(
overlay_path_value: str, src: str, dst: str
) -> tuple[Path, Path]:
"""Resolve src + dst for a rename/move. Both must be inside the overlay
root. Refuses cycle (dst inside src), missing src, missing/non-dir parent
of dst, and overwriting a symlink at dst."""
if src == "" or dst == "":
raise ValueError("move requires non-empty src and dst")
validate_overlay_ref(src)
validate_overlay_ref(dst)
overlay_root = resolve_overlay_root(overlay_path_value).resolve()
src_path = overlay_root / src
dst_path = overlay_root / dst
src_resolved = src_path.resolve(strict=False)
dst_resolved = dst_path.resolve(strict=False)
if not _is_under(src_resolved, overlay_root):
raise ValueError("src escapes overlay root")
if not _is_under(dst_resolved, overlay_root):
raise ValueError("dst escapes overlay root")
if not src_path.exists() and not src_path.is_symlink():
raise ValueError("src does not exist")
if dst_path.is_symlink():
raise ValueError("refusing to overwrite an existing symlink at dst")
dst_parent = dst_path.parent
if dst_parent.exists() and not dst_parent.is_dir():
raise ValueError("dst parent is not a directory")
if src_path.is_dir():
# Cycle check: dst must not be src or any descendant of src.
if dst_resolved == src_resolved or src_resolved in dst_resolved.parents:
raise ValueError("cannot move a directory into itself")
return src_path, dst_path
_EDITABLE_MAX_BYTES = 1 * 1024 * 1024
_UTF8_SNIFF_BYTES = 8 * 1024
def is_editable(path: Path) -> bool:
"""True iff the file is a regular file (not symlink), ≤ 1 MiB, the first
8 KiB decodes as strict UTF-8, and contains no NUL bytes. NULs decode as
valid UTF-8 (U+0000) but render badly in textareas and almost always
indicate binary content; rejecting them matches the "is this a text
file" intuition users expect. Save endpoint enforces the same rules."""
try:
if path.is_symlink():
return False
if not path.is_file():
return False
size = path.stat().st_size
except OSError:
return False
if size > _EDITABLE_MAX_BYTES:
return False
try:
with path.open("rb") as f:
sample = f.read(_UTF8_SNIFF_BYTES)
except OSError:
return False
if b"\x00" in sample:
return False
try:
sample.decode("utf-8", errors="strict")
except UnicodeDecodeError:
return False
return True
def safe_resolve_for_server_listing(server_id: int, sub_path: str) -> Path | None:
"""Resolve a path inside `runtime/<server_id>/merged` (the kernel-overlayfs
composed view of a running server). Returns None if the merged dir doesn't
exist yet (server has never started, or was just reset). Refuses paths
that escape the merged root after symlink resolution.
"""
merged_root = (get_left4me_root() / "runtime" / str(server_id) / "merged").resolve()
if not merged_root.is_dir():
return None
if sub_path == "":
return merged_root
validate_overlay_ref(sub_path)
candidate = (merged_root / sub_path).resolve(strict=False)
if not _is_under(candidate, merged_root):
raise ValueError("path escapes server merged root")
return candidate
DEFAULT_MAX_ENTRIES = 500
def list_directory(
target: Path,
overlay_root: Path,
*,
max_entries: int | None = None,
) -> tuple[list[dict], int]:
"""List one directory level. Returns (entries, truncated_count) where
truncated_count is the number of entries elided after the cap.
Directories come first, then files; both case-insensitive alphabetical.
`max_entries` falls back to module-level `DEFAULT_MAX_ENTRIES` at call
time so tests can monkeypatch the cap without re-instantiating routes."""
cap = max_entries if max_entries is not None else DEFAULT_MAX_ENTRIES
rows: list[dict] = []
with os.scandir(target) as it:
for entry in it:
rows.append(_entry_dict(entry, overlay_root))
rows.sort(key=lambda r: (0 if r["kind"] == "dir" else 1, r["name"].casefold()))
if len(rows) > cap:
truncated = len(rows) - cap
rows = rows[:cap]
else:
truncated = 0
return rows, truncated
def _entry_dict(entry: "os.DirEntry[str]", overlay_root: Path) -> dict:
is_symlink = entry.is_symlink()
broken = False
try:
is_dir = entry.is_dir(follow_symlinks=True)
except OSError:
is_dir = False
broken = True
if is_dir:
kind = "dir"
size: int | None = None
else:
kind = "file"
if broken:
size = None
else:
try:
size = entry.stat(follow_symlinks=True).st_size
except OSError:
size = None
broken = True
rel_str = "/".join(Path(entry.path).relative_to(overlay_root).parts)
if kind == "file" and not broken:
editable = is_editable(Path(entry.path))
else:
editable = False
return {
"name": entry.name,
"rel": rel_str,
"kind": kind,
"is_symlink": is_symlink,
"broken": broken,
"size": size,
"size_human": _format_size(size) if size is not None else "",
"editable": editable,
}
def _format_size(num: int) -> str:
if num < 1024:
return f"{num} B"
units = ["KB", "MB", "GB", "TB"]
value = float(num)
unit = "B"
for u in units:
value /= 1024.0
unit = u
if value < 1024:
break
return f"{value:.1f} {unit}"
def safe_resolve_for_download(overlay_path_value: str, sub_path: str) -> Path:
"""Resolve a file path the user wants to download. Allows symlink targets
anywhere inside `LEFT4ME_ROOT` (so workshop addons stream from the shared
cache) but blocks any escape outside it."""
if sub_path == "":
raise ValueError("download requires a file path")
validate_overlay_ref(sub_path)
overlay_root = resolve_overlay_root(overlay_path_value).resolve()
candidate = overlay_root / sub_path
real = Path(os.path.realpath(candidate))
left4me_root = get_left4me_root().resolve()
if not _is_under(real, left4me_root):
raise ValueError("path escapes LEFT4ME_ROOT")
return real
def safe_resolve_for_server_download(server_id: int, sub_path: str) -> Path | None:
"""Resolve a file inside `runtime/<server_id>/merged` for download. Follows
symlinks (the merged view threads through `installation/` and overlay
layers, all under LEFT4ME_ROOT). Refuses anything escaping LEFT4ME_ROOT.
Returns None when the merged dir doesn't exist yet."""
if sub_path == "":
raise ValueError("download requires a file path")
validate_overlay_ref(sub_path)
merged_root = (get_left4me_root() / "runtime" / str(server_id) / "merged").resolve()
if not merged_root.is_dir():
return None
candidate = merged_root / sub_path
real = Path(os.path.realpath(candidate))
left4me_root = get_left4me_root().resolve()
if not _is_under(real, left4me_root):
raise ValueError("path escapes LEFT4ME_ROOT")
return real