Adds Overlay.type='files' whose source-of-truth IS the overlay directory
itself. Users can:
* upload arbitrary files / whole folders by dragging from the OS onto a
folder row in the file tree (one POST per file, queue with
concurrency 3, per-file progress in a floating Uploads panel)
* move via drag-and-drop inside the tree (same gesture, source
distinguishes; refuses cycles)
* create / edit / rename / replace through a single editor modal
(text flavor for editable files, binary flavor with replace-upload
for everything else; filename input is the rename surface)
* mkdir empty folders (slashes allowed for nested intermediates)
* stream a folder as a zip download
* delete files and empty folders
Backend is type-agnostic past the new files_routes endpoints, so the
existing mount / spec / overlayfs / expose_server_cfg pipeline is reused
unchanged. is_editable gates the row's edit affordance and the /save
content rules. Three new safe-resolve helpers (write/delete/move) cover
the new operations with the same anchor-and-resolve pattern as listing
and download. FilesBuilder is a no-op so the build subsystem can
dispatch uniformly.
Spec: docs/superpowers/specs/2026-05-09-files-overlay-design.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
281 lines
10 KiB
Python
281 lines
10 KiB
Python
"""Filesystem helpers backing the overlay 'Files' section.
|
|
|
|
`safe_resolve_for_listing` and `safe_resolve_for_download` translate a
|
|
user-supplied sub-path within an overlay into a real filesystem path,
|
|
rejecting anything that would escape the overlay root (listing) or
|
|
`LEFT4ME_ROOT` (download). `list_directory` walks one level and shapes
|
|
the rows the templates render.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from pathlib import Path
|
|
|
|
from l4d2host.paths import get_left4me_root, overlay_path as resolve_overlay_root
|
|
|
|
from l4d2web.services.security import validate_overlay_ref
|
|
|
|
|
|
def _is_under(child: Path, parent: Path) -> bool:
|
|
return child == parent or parent in child.parents
|
|
|
|
|
|
def safe_resolve_for_listing(overlay_path_value: str, sub_path: str) -> Path:
|
|
"""Resolve `overlay_root / sub_path` and refuse anything that escapes the
|
|
overlay root after symlink resolution. Used to render directory listings."""
|
|
overlay_root = resolve_overlay_root(overlay_path_value).resolve()
|
|
if sub_path == "":
|
|
return overlay_root
|
|
validate_overlay_ref(sub_path)
|
|
candidate = (overlay_root / sub_path).resolve(strict=False)
|
|
if not _is_under(candidate, overlay_root):
|
|
raise ValueError("path escapes overlay root")
|
|
return candidate
|
|
|
|
|
|
def safe_resolve_for_write(overlay_path_value: str, sub_path: str) -> Path:
|
|
"""Resolve a destination path for create / write / replace operations.
|
|
|
|
Refuses empty `sub_path`, escape via `..` or symlink target, refuses to
|
|
overwrite an existing symlink, and refuses a path whose parent resolves
|
|
to a non-directory. Returns the lexical (un-resolved) candidate so the
|
|
caller can write through to the user-visible path; the safety check is
|
|
done against the resolved variant.
|
|
"""
|
|
if sub_path == "":
|
|
raise ValueError("write requires a sub-path")
|
|
validate_overlay_ref(sub_path)
|
|
overlay_root = resolve_overlay_root(overlay_path_value).resolve()
|
|
candidate = overlay_root / sub_path
|
|
resolved_candidate = candidate.resolve(strict=False)
|
|
if not _is_under(resolved_candidate, overlay_root):
|
|
raise ValueError("path escapes overlay root")
|
|
if candidate.is_symlink():
|
|
raise ValueError("refusing to overwrite an existing symlink")
|
|
parent = candidate.parent
|
|
if parent.exists() and not parent.is_dir():
|
|
raise ValueError("parent path is not a directory")
|
|
return candidate
|
|
|
|
|
|
def safe_resolve_for_delete(overlay_path_value: str, sub_path: str) -> Path:
|
|
"""Resolve a path the user wants to delete. Same root-escape rules as
|
|
`safe_resolve_for_listing`. Allows files and (empty) directories; the
|
|
caller is responsible for refusing recursive directory removal."""
|
|
if sub_path == "":
|
|
raise ValueError("delete requires a sub-path")
|
|
validate_overlay_ref(sub_path)
|
|
overlay_root = resolve_overlay_root(overlay_path_value).resolve()
|
|
candidate = overlay_root / sub_path
|
|
resolved_candidate = candidate.resolve(strict=False)
|
|
if not _is_under(resolved_candidate, overlay_root):
|
|
raise ValueError("path escapes overlay root")
|
|
return candidate
|
|
|
|
|
|
def safe_resolve_for_move(
|
|
overlay_path_value: str, src: str, dst: str
|
|
) -> tuple[Path, Path]:
|
|
"""Resolve src + dst for a rename/move. Both must be inside the overlay
|
|
root. Refuses cycle (dst inside src), missing src, missing/non-dir parent
|
|
of dst, and overwriting a symlink at dst."""
|
|
if src == "" or dst == "":
|
|
raise ValueError("move requires non-empty src and dst")
|
|
validate_overlay_ref(src)
|
|
validate_overlay_ref(dst)
|
|
overlay_root = resolve_overlay_root(overlay_path_value).resolve()
|
|
src_path = overlay_root / src
|
|
dst_path = overlay_root / dst
|
|
src_resolved = src_path.resolve(strict=False)
|
|
dst_resolved = dst_path.resolve(strict=False)
|
|
if not _is_under(src_resolved, overlay_root):
|
|
raise ValueError("src escapes overlay root")
|
|
if not _is_under(dst_resolved, overlay_root):
|
|
raise ValueError("dst escapes overlay root")
|
|
if not src_path.exists() and not src_path.is_symlink():
|
|
raise ValueError("src does not exist")
|
|
if dst_path.is_symlink():
|
|
raise ValueError("refusing to overwrite an existing symlink at dst")
|
|
dst_parent = dst_path.parent
|
|
if dst_parent.exists() and not dst_parent.is_dir():
|
|
raise ValueError("dst parent is not a directory")
|
|
if src_path.is_dir():
|
|
# Cycle check: dst must not be src or any descendant of src.
|
|
if dst_resolved == src_resolved or src_resolved in dst_resolved.parents:
|
|
raise ValueError("cannot move a directory into itself")
|
|
return src_path, dst_path
|
|
|
|
|
|
_EDITABLE_MAX_BYTES = 1 * 1024 * 1024
|
|
_UTF8_SNIFF_BYTES = 8 * 1024
|
|
|
|
|
|
def is_editable(path: Path) -> bool:
|
|
"""True iff the file is a regular file (not symlink), ≤ 1 MiB, the first
|
|
8 KiB decodes as strict UTF-8, and contains no NUL bytes. NULs decode as
|
|
valid UTF-8 (U+0000) but render badly in textareas and almost always
|
|
indicate binary content; rejecting them matches the "is this a text
|
|
file" intuition users expect. Save endpoint enforces the same rules."""
|
|
try:
|
|
if path.is_symlink():
|
|
return False
|
|
if not path.is_file():
|
|
return False
|
|
size = path.stat().st_size
|
|
except OSError:
|
|
return False
|
|
if size > _EDITABLE_MAX_BYTES:
|
|
return False
|
|
try:
|
|
with path.open("rb") as f:
|
|
sample = f.read(_UTF8_SNIFF_BYTES)
|
|
except OSError:
|
|
return False
|
|
if b"\x00" in sample:
|
|
return False
|
|
try:
|
|
sample.decode("utf-8", errors="strict")
|
|
except UnicodeDecodeError:
|
|
return False
|
|
return True
|
|
|
|
|
|
def safe_resolve_for_server_listing(server_id: int, sub_path: str) -> Path | None:
|
|
"""Resolve a path inside `runtime/<server_id>/merged` (the kernel-overlayfs
|
|
composed view of a running server). Returns None if the merged dir doesn't
|
|
exist yet (server has never started, or was just reset). Refuses paths
|
|
that escape the merged root after symlink resolution.
|
|
"""
|
|
merged_root = (get_left4me_root() / "runtime" / str(server_id) / "merged").resolve()
|
|
if not merged_root.is_dir():
|
|
return None
|
|
if sub_path == "":
|
|
return merged_root
|
|
validate_overlay_ref(sub_path)
|
|
candidate = (merged_root / sub_path).resolve(strict=False)
|
|
if not _is_under(candidate, merged_root):
|
|
raise ValueError("path escapes server merged root")
|
|
return candidate
|
|
|
|
|
|
DEFAULT_MAX_ENTRIES = 500
|
|
|
|
|
|
def list_directory(
|
|
target: Path,
|
|
overlay_root: Path,
|
|
*,
|
|
max_entries: int | None = None,
|
|
) -> tuple[list[dict], int]:
|
|
"""List one directory level. Returns (entries, truncated_count) where
|
|
truncated_count is the number of entries elided after the cap.
|
|
Directories come first, then files; both case-insensitive alphabetical.
|
|
|
|
`max_entries` falls back to module-level `DEFAULT_MAX_ENTRIES` at call
|
|
time so tests can monkeypatch the cap without re-instantiating routes."""
|
|
cap = max_entries if max_entries is not None else DEFAULT_MAX_ENTRIES
|
|
|
|
rows: list[dict] = []
|
|
with os.scandir(target) as it:
|
|
for entry in it:
|
|
rows.append(_entry_dict(entry, overlay_root))
|
|
|
|
rows.sort(key=lambda r: (0 if r["kind"] == "dir" else 1, r["name"].casefold()))
|
|
|
|
if len(rows) > cap:
|
|
truncated = len(rows) - cap
|
|
rows = rows[:cap]
|
|
else:
|
|
truncated = 0
|
|
return rows, truncated
|
|
|
|
|
|
def _entry_dict(entry: "os.DirEntry[str]", overlay_root: Path) -> dict:
|
|
is_symlink = entry.is_symlink()
|
|
broken = False
|
|
try:
|
|
is_dir = entry.is_dir(follow_symlinks=True)
|
|
except OSError:
|
|
is_dir = False
|
|
broken = True
|
|
|
|
if is_dir:
|
|
kind = "dir"
|
|
size: int | None = None
|
|
else:
|
|
kind = "file"
|
|
if broken:
|
|
size = None
|
|
else:
|
|
try:
|
|
size = entry.stat(follow_symlinks=True).st_size
|
|
except OSError:
|
|
size = None
|
|
broken = True
|
|
|
|
rel_str = "/".join(Path(entry.path).relative_to(overlay_root).parts)
|
|
|
|
if kind == "file" and not broken:
|
|
editable = is_editable(Path(entry.path))
|
|
else:
|
|
editable = False
|
|
|
|
return {
|
|
"name": entry.name,
|
|
"rel": rel_str,
|
|
"kind": kind,
|
|
"is_symlink": is_symlink,
|
|
"broken": broken,
|
|
"size": size,
|
|
"size_human": _format_size(size) if size is not None else "",
|
|
"editable": editable,
|
|
}
|
|
|
|
|
|
def _format_size(num: int) -> str:
|
|
if num < 1024:
|
|
return f"{num} B"
|
|
units = ["KB", "MB", "GB", "TB"]
|
|
value = float(num)
|
|
unit = "B"
|
|
for u in units:
|
|
value /= 1024.0
|
|
unit = u
|
|
if value < 1024:
|
|
break
|
|
return f"{value:.1f} {unit}"
|
|
|
|
|
|
def safe_resolve_for_download(overlay_path_value: str, sub_path: str) -> Path:
|
|
"""Resolve a file path the user wants to download. Allows symlink targets
|
|
anywhere inside `LEFT4ME_ROOT` (so workshop addons stream from the shared
|
|
cache) but blocks any escape outside it."""
|
|
if sub_path == "":
|
|
raise ValueError("download requires a file path")
|
|
validate_overlay_ref(sub_path)
|
|
overlay_root = resolve_overlay_root(overlay_path_value).resolve()
|
|
candidate = overlay_root / sub_path
|
|
real = Path(os.path.realpath(candidate))
|
|
left4me_root = get_left4me_root().resolve()
|
|
if not _is_under(real, left4me_root):
|
|
raise ValueError("path escapes LEFT4ME_ROOT")
|
|
return real
|
|
|
|
|
|
def safe_resolve_for_server_download(server_id: int, sub_path: str) -> Path | None:
|
|
"""Resolve a file inside `runtime/<server_id>/merged` for download. Follows
|
|
symlinks (the merged view threads through `installation/` and overlay
|
|
layers, all under LEFT4ME_ROOT). Refuses anything escaping LEFT4ME_ROOT.
|
|
Returns None when the merged dir doesn't exist yet."""
|
|
if sub_path == "":
|
|
raise ValueError("download requires a file path")
|
|
validate_overlay_ref(sub_path)
|
|
merged_root = (get_left4me_root() / "runtime" / str(server_id) / "merged").resolve()
|
|
if not merged_root.is_dir():
|
|
return None
|
|
candidate = merged_root / sub_path
|
|
real = Path(os.path.realpath(candidate))
|
|
left4me_root = get_left4me_root().resolve()
|
|
if not _is_under(real, left4me_root):
|
|
raise ValueError("path escapes LEFT4ME_ROOT")
|
|
return real
|