Adds ScriptBuilder that runs user-authored bash inside the
left4me-script-sandbox helper via run_command, with a 20 GB post-build
disk cap. Registry now {"workshop", "script"}.
finish_job writes Overlay.last_build_status on build_overlay completion.
Drops GlobalMapOverlayBuilder and the now-unreachable
_check_global_overlay_caches in l4d2_facade.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
359 lines
13 KiB
Python
359 lines
13 KiB
Python
"""Tests for overlay builders (registry, WorkshopBuilder, ScriptBuilder)."""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
|
|
import pytest
|
|
|
|
from l4d2web.db import init_db, session_scope
|
|
from l4d2web.models import Overlay, OverlayWorkshopItem, User, WorkshopItem
|
|
from l4d2web.services import overlay_builders
|
|
from l4d2web.services.host_commands import CommandCancelledError, CommandResult
|
|
|
|
|
|
@pytest.fixture
|
|
def env(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'b.db'}")
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
init_db()
|
|
yield tmp_path
|
|
|
|
|
|
def _create_user_and_overlay(name: str, type_: str) -> tuple[int, int]:
|
|
with session_scope() as s:
|
|
user = User(username="alice", password_digest="x")
|
|
s.add(user)
|
|
s.flush()
|
|
overlay = Overlay(name=name, path=str(7), type=type_, user_id=user.id)
|
|
s.add(overlay)
|
|
s.flush()
|
|
return user.id, overlay.id
|
|
|
|
|
|
def _add_workshop_item(steam_id: str, *, downloaded: bool, cache_root: Path, content: bytes = b"x") -> int:
|
|
if downloaded:
|
|
cache_root.mkdir(parents=True, exist_ok=True)
|
|
(cache_root / f"{steam_id}.vpk").write_bytes(content)
|
|
with session_scope() as s:
|
|
wi = WorkshopItem(
|
|
steam_id=steam_id,
|
|
title=f"item-{steam_id}",
|
|
filename=f"orig-{steam_id}.vpk",
|
|
file_url=f"https://example.com/{steam_id}.vpk",
|
|
file_size=len(content) if downloaded else 0,
|
|
time_updated=1700000000 if downloaded else 0,
|
|
last_downloaded_at=datetime.now(UTC) if downloaded else None,
|
|
)
|
|
s.add(wi)
|
|
s.flush()
|
|
return wi.id
|
|
|
|
|
|
def _associate(overlay_id: int, item_id: int) -> None:
|
|
with session_scope() as s:
|
|
s.add(OverlayWorkshopItem(overlay_id=overlay_id, workshop_item_id=item_id))
|
|
|
|
|
|
def _capture_logs():
|
|
out: list[str] = []
|
|
err: list[str] = []
|
|
return out, err, out.append, err.append
|
|
|
|
|
|
def test_builders_registry() -> None:
|
|
assert set(overlay_builders.BUILDERS) == {"workshop", "script"}
|
|
|
|
|
|
def test_registry_excludes_legacy_types() -> None:
|
|
for legacy in ("external", "l4d2center_maps", "cedapug_maps"):
|
|
assert legacy not in overlay_builders.BUILDERS
|
|
|
|
|
|
def test_registry_unknown_type_raises_keyerror() -> None:
|
|
with pytest.raises(KeyError):
|
|
overlay_builders.BUILDERS["nope"]
|
|
|
|
|
|
def test_workshop_builder_unchanged() -> None:
|
|
"""Regression guard against accidental removal during refactor."""
|
|
builder = overlay_builders.BUILDERS["workshop"]
|
|
assert isinstance(builder, overlay_builders.WorkshopBuilder)
|
|
assert hasattr(builder, "build")
|
|
|
|
|
|
def test_workshop_builder_creates_absolute_symlinks(env: Path) -> None:
|
|
_, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
cache_root = env / "workshop_cache"
|
|
item_a = _add_workshop_item("1001", downloaded=True, cache_root=cache_root, content=b"AAA")
|
|
item_b = _add_workshop_item("1002", downloaded=True, cache_root=cache_root, content=b"BBBB")
|
|
_associate(overlay_id, item_a)
|
|
_associate(overlay_id, item_b)
|
|
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with session_scope() as s:
|
|
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
|
|
overlay_builders.BUILDERS["workshop"].build(
|
|
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False
|
|
)
|
|
|
|
addons = env / "overlays" / "7" / "left4dead2" / "addons"
|
|
link_a = addons / "1001.vpk"
|
|
link_b = addons / "1002.vpk"
|
|
assert link_a.is_symlink()
|
|
assert link_b.is_symlink()
|
|
# Targets must be ABSOLUTE so they resolve in the host's namespace.
|
|
assert os.path.isabs(os.readlink(link_a))
|
|
assert os.path.isabs(os.readlink(link_b))
|
|
# And they must resolve to the cache files.
|
|
assert link_a.resolve() == (cache_root / "1001.vpk").resolve()
|
|
assert link_b.resolve() == (cache_root / "1002.vpk").resolve()
|
|
|
|
|
|
def test_workshop_builder_skips_uncached_items_with_warning(env: Path) -> None:
|
|
_, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
cache_root = env / "workshop_cache"
|
|
|
|
cached = _add_workshop_item("1001", downloaded=True, cache_root=cache_root)
|
|
uncached = _add_workshop_item("9999", downloaded=False, cache_root=cache_root)
|
|
_associate(overlay_id, cached)
|
|
_associate(overlay_id, uncached)
|
|
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with session_scope() as s:
|
|
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
|
|
overlay_builders.BUILDERS["workshop"].build(
|
|
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False
|
|
)
|
|
|
|
addons = env / "overlays" / "7" / "left4dead2" / "addons"
|
|
assert (addons / "1001.vpk").is_symlink()
|
|
assert not (addons / "9999.vpk").exists(), "must NOT create dangling symlink"
|
|
assert any("9999" in line and ("skip" in line.lower() or "uncached" in line.lower()) for line in err + out), err + out
|
|
|
|
|
|
def test_workshop_builder_rerun_is_idempotent(env: Path) -> None:
|
|
_, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
cache_root = env / "workshop_cache"
|
|
item = _add_workshop_item("1001", downloaded=True, cache_root=cache_root)
|
|
_associate(overlay_id, item)
|
|
|
|
addons = env / "overlays" / "7" / "left4dead2" / "addons"
|
|
|
|
# First run.
|
|
with session_scope() as s:
|
|
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
|
|
overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=lambda: False)
|
|
first_inode = (addons / "1001.vpk").lstat().st_ino
|
|
|
|
# Second run — no-op.
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with session_scope() as s:
|
|
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
|
|
overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False)
|
|
|
|
second_inode = (addons / "1001.vpk").lstat().st_ino
|
|
assert first_inode == second_inode, "symlink should be untouched on idempotent rebuild"
|
|
assert any("unchanged" in line.lower() for line in out), out
|
|
|
|
|
|
def test_workshop_builder_removes_obsolete_symlinks(env: Path) -> None:
|
|
_, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
cache_root = env / "workshop_cache"
|
|
item_a = _add_workshop_item("1001", downloaded=True, cache_root=cache_root)
|
|
item_b = _add_workshop_item("1002", downloaded=True, cache_root=cache_root)
|
|
_associate(overlay_id, item_a)
|
|
_associate(overlay_id, item_b)
|
|
|
|
addons = env / "overlays" / "7" / "left4dead2" / "addons"
|
|
with session_scope() as s:
|
|
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
|
|
overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=lambda: False)
|
|
assert (addons / "1002.vpk").is_symlink()
|
|
|
|
# Remove the association for 1002.
|
|
with session_scope() as s:
|
|
s.query(OverlayWorkshopItem).filter_by(workshop_item_id=item_b).delete()
|
|
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with session_scope() as s:
|
|
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
|
|
overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False)
|
|
|
|
assert (addons / "1001.vpk").is_symlink()
|
|
assert not (addons / "1002.vpk").exists()
|
|
# Cache file must remain — overlays are diff-applied, cache is shared.
|
|
assert (cache_root / "1002.vpk").exists()
|
|
|
|
|
|
def test_workshop_builder_leaves_unrelated_files_alone(env: Path) -> None:
|
|
_, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
cache_root = env / "workshop_cache"
|
|
item = _add_workshop_item("1001", downloaded=True, cache_root=cache_root)
|
|
_associate(overlay_id, item)
|
|
|
|
addons = env / "overlays" / "7" / "left4dead2" / "addons"
|
|
addons.mkdir(parents=True, exist_ok=True)
|
|
(addons / "manual_addon.vpk").write_bytes(b"hand-placed")
|
|
|
|
with session_scope() as s:
|
|
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
|
|
overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=lambda: False)
|
|
|
|
# Manual file is preserved.
|
|
assert (addons / "manual_addon.vpk").read_bytes() == b"hand-placed"
|
|
# Workshop symlink is created alongside.
|
|
assert (addons / "1001.vpk").is_symlink()
|
|
|
|
|
|
def test_workshop_builder_honors_should_cancel(env: Path) -> None:
|
|
_, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
cache_root = env / "workshop_cache"
|
|
items = [_add_workshop_item(f"100{i}", downloaded=True, cache_root=cache_root) for i in range(3)]
|
|
for it in items:
|
|
_associate(overlay_id, it)
|
|
|
|
cancel_calls = {"n": 0}
|
|
|
|
def cancel():
|
|
cancel_calls["n"] += 1
|
|
return cancel_calls["n"] > 0 # cancel immediately
|
|
|
|
with session_scope() as s:
|
|
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
|
|
# Should not crash; partial state is consistent (re-run heals).
|
|
overlay_builders.BUILDERS["workshop"].build(
|
|
overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=cancel
|
|
)
|
|
|
|
|
|
# --- ScriptBuilder ---------------------------------------------------------
|
|
|
|
def _script_overlay(*, id_: int = 42, script: str = "echo hi") -> SimpleNamespace:
|
|
return SimpleNamespace(id=id_, type="script", path=str(id_), script=script)
|
|
|
|
|
|
def test_script_builder_invokes_helper(env, monkeypatch) -> None:
|
|
captured: dict = {}
|
|
|
|
def fake_run(cmd, *, on_stdout, on_stderr, should_cancel):
|
|
captured["cmd"] = list(cmd)
|
|
captured["script_text"] = open(cmd[-1]).read()
|
|
captured["script_path_existed"] = os.path.exists(cmd[-1])
|
|
return CommandResult(returncode=0, stdout="", stderr="")
|
|
|
|
monkeypatch.setattr(overlay_builders, "run_command", fake_run)
|
|
monkeypatch.setattr(
|
|
overlay_builders.ScriptBuilder, "_enforce_disk_budget", lambda *a, **kw: None
|
|
)
|
|
|
|
overlay = _script_overlay()
|
|
overlay_builders.ScriptBuilder().build(
|
|
overlay,
|
|
on_stdout=lambda _x: None,
|
|
on_stderr=lambda _x: None,
|
|
should_cancel=lambda: False,
|
|
)
|
|
|
|
assert captured["cmd"][:4] == [
|
|
"sudo",
|
|
"-n",
|
|
"/usr/local/libexec/left4me/left4me-script-sandbox",
|
|
"42",
|
|
]
|
|
assert captured["script_text"] == "echo hi"
|
|
assert captured["script_path_existed"] is True
|
|
# Tmpfile is unlinked after build.
|
|
assert not os.path.exists(captured["cmd"][-1])
|
|
|
|
|
|
def test_script_builder_disk_cap(env, monkeypatch) -> None:
|
|
monkeypatch.setattr(
|
|
overlay_builders,
|
|
"run_command",
|
|
lambda *a, **kw: CommandResult(returncode=0, stdout="", stderr=""),
|
|
)
|
|
monkeypatch.setattr(
|
|
overlay_builders.subprocess,
|
|
"check_output",
|
|
lambda *a, **kw: b"25000000000\t/some/path\n",
|
|
)
|
|
|
|
err: list[str] = []
|
|
overlay = _script_overlay(script="")
|
|
|
|
with pytest.raises(overlay_builders.BuildError):
|
|
overlay_builders.ScriptBuilder().build(
|
|
overlay,
|
|
on_stdout=lambda _x: None,
|
|
on_stderr=err.append,
|
|
should_cancel=lambda: False,
|
|
)
|
|
|
|
assert any("20" in line and "GB" in line for line in err), err
|
|
|
|
|
|
def test_script_builder_streams_output(env, monkeypatch) -> None:
|
|
def fake_run(cmd, *, on_stdout, on_stderr, should_cancel):
|
|
on_stdout("hello")
|
|
on_stderr("warn")
|
|
return CommandResult(returncode=0, stdout="hello", stderr="warn")
|
|
|
|
monkeypatch.setattr(overlay_builders, "run_command", fake_run)
|
|
monkeypatch.setattr(
|
|
overlay_builders.ScriptBuilder, "_enforce_disk_budget", lambda *a, **kw: None
|
|
)
|
|
|
|
out: list[str] = []
|
|
err: list[str] = []
|
|
overlay = _script_overlay(script="")
|
|
overlay_builders.ScriptBuilder().build(
|
|
overlay, on_stdout=out.append, on_stderr=err.append, should_cancel=lambda: False
|
|
)
|
|
assert out == ["hello"]
|
|
assert err == ["warn"]
|
|
|
|
|
|
def test_script_builder_passes_should_cancel_through(env, monkeypatch) -> None:
|
|
captured: dict = {}
|
|
|
|
def fake_run(cmd, *, on_stdout, on_stderr, should_cancel):
|
|
captured["should_cancel"] = should_cancel
|
|
raise CommandCancelledError(returncode=1, cmd=cmd, output="", stderr="")
|
|
|
|
monkeypatch.setattr(overlay_builders, "run_command", fake_run)
|
|
monkeypatch.setattr(
|
|
overlay_builders.ScriptBuilder, "_enforce_disk_budget", lambda *a, **kw: None
|
|
)
|
|
|
|
overlay = _script_overlay(script="")
|
|
with pytest.raises(CommandCancelledError):
|
|
overlay_builders.ScriptBuilder().build(
|
|
overlay,
|
|
on_stdout=lambda _x: None,
|
|
on_stderr=lambda _x: None,
|
|
should_cancel=lambda: True,
|
|
)
|
|
assert captured["should_cancel"]() is True
|
|
|
|
|
|
def test_script_builder_cleans_up_tmpfile_on_failure(env, monkeypatch) -> None:
|
|
captured: dict = {}
|
|
|
|
def fake_run(cmd, *, on_stdout, on_stderr, should_cancel):
|
|
captured["script_path"] = cmd[-1]
|
|
raise CommandCancelledError(returncode=1, cmd=cmd, output="", stderr="")
|
|
|
|
monkeypatch.setattr(overlay_builders, "run_command", fake_run)
|
|
|
|
overlay = _script_overlay(script="")
|
|
with pytest.raises(CommandCancelledError):
|
|
overlay_builders.ScriptBuilder().build(
|
|
overlay,
|
|
on_stdout=lambda _x: None,
|
|
on_stderr=lambda _x: None,
|
|
should_cancel=lambda: False,
|
|
)
|
|
assert not os.path.exists(captured["script_path"])
|