left4me/l4d2web/tests/test_overlay_builders.py
mwiegand 2d3c98866a
feat(files-overlay): user-managed file content as a third overlay type
Adds Overlay.type='files' whose source-of-truth IS the overlay directory
itself. Users can:

  * upload arbitrary files / whole folders by dragging from the OS onto a
    folder row in the file tree (one POST per file, queue with
    concurrency 3, per-file progress in a floating Uploads panel)
  * move via drag-and-drop inside the tree (same gesture, source
    distinguishes; refuses cycles)
  * create / edit / rename / replace through a single editor modal
    (text flavor for editable files, binary flavor with replace-upload
    for everything else; filename input is the rename surface)
  * mkdir empty folders (slashes allowed for nested intermediates)
  * stream a folder as a zip download
  * delete files and empty folders

Backend is type-agnostic past the new files_routes endpoints, so the
existing mount / spec / overlayfs / expose_server_cfg pipeline is reused
unchanged. is_editable gates the row's edit affordance and the /save
content rules. Three new safe-resolve helpers (write/delete/move) cover
the new operations with the same anchor-and-resolve pattern as listing
and download. FilesBuilder is a no-op so the build subsystem can
dispatch uniformly.

Spec: docs/superpowers/specs/2026-05-09-files-overlay-design.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-09 18:59:32 +02:00

382 lines
14 KiB
Python

"""Tests for overlay builders (registry, WorkshopBuilder, ScriptBuilder)."""
from __future__ import annotations
import os
from datetime import UTC, datetime
from pathlib import Path
from types import SimpleNamespace
import pytest
from l4d2web.db import init_db, session_scope
from l4d2web.models import Overlay, OverlayWorkshopItem, User, WorkshopItem
from l4d2web.services import overlay_builders
from l4d2web.services.host_commands import CommandCancelledError, CommandResult
@pytest.fixture
def env(tmp_path, monkeypatch):
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'b.db'}")
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
init_db()
yield tmp_path
def _create_user_and_overlay(name: str, type_: str) -> tuple[int, int]:
with session_scope() as s:
user = User(username="alice", password_digest="x")
s.add(user)
s.flush()
overlay = Overlay(name=name, path=str(7), type=type_, user_id=user.id)
s.add(overlay)
s.flush()
return user.id, overlay.id
def _add_workshop_item(steam_id: str, *, downloaded: bool, cache_root: Path, content: bytes = b"x") -> int:
if downloaded:
cache_root.mkdir(parents=True, exist_ok=True)
(cache_root / f"{steam_id}.vpk").write_bytes(content)
with session_scope() as s:
wi = WorkshopItem(
steam_id=steam_id,
title=f"item-{steam_id}",
filename=f"orig-{steam_id}.vpk",
file_url=f"https://example.com/{steam_id}.vpk",
file_size=len(content) if downloaded else 0,
time_updated=1700000000 if downloaded else 0,
last_downloaded_at=datetime.now(UTC) if downloaded else None,
)
s.add(wi)
s.flush()
return wi.id
def _associate(overlay_id: int, item_id: int) -> None:
with session_scope() as s:
s.add(OverlayWorkshopItem(overlay_id=overlay_id, workshop_item_id=item_id))
def _capture_logs():
out: list[str] = []
err: list[str] = []
return out, err, out.append, err.append
def test_builders_registry() -> None:
assert set(overlay_builders.BUILDERS) == {"workshop", "script", "files"}
def test_registry_excludes_legacy_types() -> None:
for legacy in ("external", "l4d2center_maps", "cedapug_maps"):
assert legacy not in overlay_builders.BUILDERS
def test_files_builder_is_idempotent_no_op(monkeypatch, tmp_path) -> None:
"""Files builder ensures the overlay directory exists. Running twice
against an already-populated overlay must not clobber its contents."""
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
overlay = type("O", (), {"id": 42, "name": "files-fixture"})()
out, err, on_stdout, on_stderr = _capture_logs()
overlay_builders.BUILDERS["files"].build(
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False
)
overlay_dir = tmp_path / "overlays" / "42"
assert overlay_dir.is_dir()
(overlay_dir / "kept.txt").write_text("preserved")
overlay_builders.BUILDERS["files"].build(
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False
)
assert (overlay_dir / "kept.txt").read_text() == "preserved"
assert err == []
def test_registry_unknown_type_raises_keyerror() -> None:
with pytest.raises(KeyError):
overlay_builders.BUILDERS["nope"]
def test_workshop_builder_unchanged() -> None:
"""Regression guard against accidental removal during refactor."""
builder = overlay_builders.BUILDERS["workshop"]
assert isinstance(builder, overlay_builders.WorkshopBuilder)
assert hasattr(builder, "build")
def test_workshop_builder_creates_absolute_symlinks(env: Path) -> None:
_, overlay_id = _create_user_and_overlay("ws", "workshop")
cache_root = env / "workshop_cache"
item_a = _add_workshop_item("1001", downloaded=True, cache_root=cache_root, content=b"AAA")
item_b = _add_workshop_item("1002", downloaded=True, cache_root=cache_root, content=b"BBBB")
_associate(overlay_id, item_a)
_associate(overlay_id, item_b)
out, err, on_stdout, on_stderr = _capture_logs()
with session_scope() as s:
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
overlay_builders.BUILDERS["workshop"].build(
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False
)
addons = env / "overlays" / "7" / "left4dead2" / "addons"
link_a = addons / "1001.vpk"
link_b = addons / "1002.vpk"
assert link_a.is_symlink()
assert link_b.is_symlink()
# Targets must be ABSOLUTE so they resolve in the host's namespace.
assert os.path.isabs(os.readlink(link_a))
assert os.path.isabs(os.readlink(link_b))
# And they must resolve to the cache files.
assert link_a.resolve() == (cache_root / "1001.vpk").resolve()
assert link_b.resolve() == (cache_root / "1002.vpk").resolve()
def test_workshop_builder_skips_uncached_items_with_warning(env: Path) -> None:
_, overlay_id = _create_user_and_overlay("ws", "workshop")
cache_root = env / "workshop_cache"
cached = _add_workshop_item("1001", downloaded=True, cache_root=cache_root)
uncached = _add_workshop_item("9999", downloaded=False, cache_root=cache_root)
_associate(overlay_id, cached)
_associate(overlay_id, uncached)
out, err, on_stdout, on_stderr = _capture_logs()
with session_scope() as s:
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
overlay_builders.BUILDERS["workshop"].build(
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False
)
addons = env / "overlays" / "7" / "left4dead2" / "addons"
assert (addons / "1001.vpk").is_symlink()
assert not (addons / "9999.vpk").exists(), "must NOT create dangling symlink"
assert any("9999" in line and ("skip" in line.lower() or "uncached" in line.lower()) for line in err + out), err + out
def test_workshop_builder_rerun_is_idempotent(env: Path) -> None:
_, overlay_id = _create_user_and_overlay("ws", "workshop")
cache_root = env / "workshop_cache"
item = _add_workshop_item("1001", downloaded=True, cache_root=cache_root)
_associate(overlay_id, item)
addons = env / "overlays" / "7" / "left4dead2" / "addons"
# First run.
with session_scope() as s:
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=lambda: False)
first_inode = (addons / "1001.vpk").lstat().st_ino
# Second run — no-op.
out, err, on_stdout, on_stderr = _capture_logs()
with session_scope() as s:
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False)
second_inode = (addons / "1001.vpk").lstat().st_ino
assert first_inode == second_inode, "symlink should be untouched on idempotent rebuild"
assert any("unchanged" in line.lower() for line in out), out
def test_workshop_builder_removes_obsolete_symlinks(env: Path) -> None:
_, overlay_id = _create_user_and_overlay("ws", "workshop")
cache_root = env / "workshop_cache"
item_a = _add_workshop_item("1001", downloaded=True, cache_root=cache_root)
item_b = _add_workshop_item("1002", downloaded=True, cache_root=cache_root)
_associate(overlay_id, item_a)
_associate(overlay_id, item_b)
addons = env / "overlays" / "7" / "left4dead2" / "addons"
with session_scope() as s:
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=lambda: False)
assert (addons / "1002.vpk").is_symlink()
# Remove the association for 1002.
with session_scope() as s:
s.query(OverlayWorkshopItem).filter_by(workshop_item_id=item_b).delete()
out, err, on_stdout, on_stderr = _capture_logs()
with session_scope() as s:
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False)
assert (addons / "1001.vpk").is_symlink()
assert not (addons / "1002.vpk").exists()
# Cache file must remain — overlays are diff-applied, cache is shared.
assert (cache_root / "1002.vpk").exists()
def test_workshop_builder_leaves_unrelated_files_alone(env: Path) -> None:
_, overlay_id = _create_user_and_overlay("ws", "workshop")
cache_root = env / "workshop_cache"
item = _add_workshop_item("1001", downloaded=True, cache_root=cache_root)
_associate(overlay_id, item)
addons = env / "overlays" / "7" / "left4dead2" / "addons"
addons.mkdir(parents=True, exist_ok=True)
(addons / "manual_addon.vpk").write_bytes(b"hand-placed")
with session_scope() as s:
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=lambda: False)
# Manual file is preserved.
assert (addons / "manual_addon.vpk").read_bytes() == b"hand-placed"
# Workshop symlink is created alongside.
assert (addons / "1001.vpk").is_symlink()
def test_workshop_builder_honors_should_cancel(env: Path) -> None:
_, overlay_id = _create_user_and_overlay("ws", "workshop")
cache_root = env / "workshop_cache"
items = [_add_workshop_item(f"100{i}", downloaded=True, cache_root=cache_root) for i in range(3)]
for it in items:
_associate(overlay_id, it)
cancel_calls = {"n": 0}
def cancel():
cancel_calls["n"] += 1
return cancel_calls["n"] > 0 # cancel immediately
with session_scope() as s:
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
# Should not crash; partial state is consistent (re-run heals).
overlay_builders.BUILDERS["workshop"].build(
overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=cancel
)
# --- ScriptBuilder ---------------------------------------------------------
def _script_overlay(*, id_: int = 42, script: str = "echo hi") -> SimpleNamespace:
return SimpleNamespace(id=id_, type="script", path=str(id_), script=script)
def test_script_builder_invokes_helper(env, monkeypatch) -> None:
captured: dict = {}
def fake_run(cmd, *, on_stdout, on_stderr, should_cancel):
captured["cmd"] = list(cmd)
captured["script_text"] = open(cmd[-1]).read()
captured["script_path_existed"] = os.path.exists(cmd[-1])
return CommandResult(returncode=0, stdout="", stderr="")
monkeypatch.setattr(overlay_builders, "run_command", fake_run)
monkeypatch.setattr(
overlay_builders.ScriptBuilder, "_enforce_disk_budget", lambda *a, **kw: None
)
overlay = _script_overlay()
overlay_builders.ScriptBuilder().build(
overlay,
on_stdout=lambda _x: None,
on_stderr=lambda _x: None,
should_cancel=lambda: False,
)
assert captured["cmd"][:4] == [
"sudo",
"-n",
"/usr/local/libexec/left4me/left4me-script-sandbox",
"42",
]
assert captured["script_text"] == "echo hi"
assert captured["script_path_existed"] is True
# Tmpfile is unlinked after build.
assert not os.path.exists(captured["cmd"][-1])
def test_script_builder_disk_cap(env, monkeypatch) -> None:
monkeypatch.setattr(
overlay_builders,
"run_command",
lambda *a, **kw: CommandResult(returncode=0, stdout="", stderr=""),
)
monkeypatch.setattr(
overlay_builders.subprocess,
"check_output",
lambda *a, **kw: b"25000000000\t/some/path\n",
)
err: list[str] = []
overlay = _script_overlay(script="")
with pytest.raises(overlay_builders.BuildError):
overlay_builders.ScriptBuilder().build(
overlay,
on_stdout=lambda _x: None,
on_stderr=err.append,
should_cancel=lambda: False,
)
assert any("20" in line and "GB" in line for line in err), err
def test_script_builder_streams_output(env, monkeypatch) -> None:
def fake_run(cmd, *, on_stdout, on_stderr, should_cancel):
on_stdout("hello")
on_stderr("warn")
return CommandResult(returncode=0, stdout="hello", stderr="warn")
monkeypatch.setattr(overlay_builders, "run_command", fake_run)
monkeypatch.setattr(
overlay_builders.ScriptBuilder, "_enforce_disk_budget", lambda *a, **kw: None
)
out: list[str] = []
err: list[str] = []
overlay = _script_overlay(script="")
overlay_builders.ScriptBuilder().build(
overlay, on_stdout=out.append, on_stderr=err.append, should_cancel=lambda: False
)
assert out == ["hello"]
assert err == ["warn"]
def test_script_builder_passes_should_cancel_through(env, monkeypatch) -> None:
captured: dict = {}
def fake_run(cmd, *, on_stdout, on_stderr, should_cancel):
captured["should_cancel"] = should_cancel
raise CommandCancelledError(returncode=1, cmd=cmd, output="", stderr="")
monkeypatch.setattr(overlay_builders, "run_command", fake_run)
monkeypatch.setattr(
overlay_builders.ScriptBuilder, "_enforce_disk_budget", lambda *a, **kw: None
)
overlay = _script_overlay(script="")
with pytest.raises(CommandCancelledError):
overlay_builders.ScriptBuilder().build(
overlay,
on_stdout=lambda _x: None,
on_stderr=lambda _x: None,
should_cancel=lambda: True,
)
assert captured["should_cancel"]() is True
def test_script_builder_cleans_up_tmpfile_on_failure(env, monkeypatch) -> None:
captured: dict = {}
def fake_run(cmd, *, on_stdout, on_stderr, should_cancel):
captured["script_path"] = cmd[-1]
raise CommandCancelledError(returncode=1, cmd=cmd, output="", stderr="")
monkeypatch.setattr(overlay_builders, "run_command", fake_run)
overlay = _script_overlay(script="")
with pytest.raises(CommandCancelledError):
overlay_builders.ScriptBuilder().build(
overlay,
on_stdout=lambda _x: None,
on_stderr=lambda _x: None,
should_cancel=lambda: False,
)
assert not os.path.exists(captured["script_path"])