"""Tests for overlay builders (registry, WorkshopBuilder, ScriptBuilder).""" from __future__ import annotations import os from datetime import UTC, datetime from pathlib import Path from types import SimpleNamespace import pytest from l4d2web.db import init_db, session_scope from l4d2web.models import Overlay, OverlayWorkshopItem, User, WorkshopItem from l4d2web.services import overlay_builders from l4d2web.services.host_commands import CommandCancelledError, CommandResult @pytest.fixture def env(tmp_path, monkeypatch): monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'b.db'}") monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path)) init_db() yield tmp_path def _create_user_and_overlay(name: str, type_: str) -> tuple[int, int]: with session_scope() as s: user = User(username="alice", password_digest="x") s.add(user) s.flush() overlay = Overlay(name=name, path=str(7), type=type_, user_id=user.id) s.add(overlay) s.flush() return user.id, overlay.id def _add_workshop_item(steam_id: str, *, downloaded: bool, cache_root: Path, content: bytes = b"x") -> int: if downloaded: cache_root.mkdir(parents=True, exist_ok=True) (cache_root / f"{steam_id}.vpk").write_bytes(content) with session_scope() as s: wi = WorkshopItem( steam_id=steam_id, title=f"item-{steam_id}", filename=f"orig-{steam_id}.vpk", file_url=f"https://example.com/{steam_id}.vpk", file_size=len(content) if downloaded else 0, time_updated=1700000000 if downloaded else 0, last_downloaded_at=datetime.now(UTC) if downloaded else None, ) s.add(wi) s.flush() return wi.id def _associate(overlay_id: int, item_id: int) -> None: with session_scope() as s: s.add(OverlayWorkshopItem(overlay_id=overlay_id, workshop_item_id=item_id)) def _capture_logs(): out: list[str] = [] err: list[str] = [] return out, err, out.append, err.append def test_builders_registry() -> None: assert set(overlay_builders.BUILDERS) == {"workshop", "script"} def test_registry_excludes_legacy_types() -> None: for legacy in ("external", "l4d2center_maps", "cedapug_maps"): assert legacy not in overlay_builders.BUILDERS def test_registry_unknown_type_raises_keyerror() -> None: with pytest.raises(KeyError): overlay_builders.BUILDERS["nope"] def test_workshop_builder_unchanged() -> None: """Regression guard against accidental removal during refactor.""" builder = overlay_builders.BUILDERS["workshop"] assert isinstance(builder, overlay_builders.WorkshopBuilder) assert hasattr(builder, "build") def test_workshop_builder_creates_absolute_symlinks(env: Path) -> None: _, overlay_id = _create_user_and_overlay("ws", "workshop") cache_root = env / "workshop_cache" item_a = _add_workshop_item("1001", downloaded=True, cache_root=cache_root, content=b"AAA") item_b = _add_workshop_item("1002", downloaded=True, cache_root=cache_root, content=b"BBBB") _associate(overlay_id, item_a) _associate(overlay_id, item_b) out, err, on_stdout, on_stderr = _capture_logs() with session_scope() as s: overlay = s.query(Overlay).filter_by(id=overlay_id).one() overlay_builders.BUILDERS["workshop"].build( overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False ) addons = env / "overlays" / "7" / "left4dead2" / "addons" link_a = addons / "1001.vpk" link_b = addons / "1002.vpk" assert link_a.is_symlink() assert link_b.is_symlink() # Targets must be ABSOLUTE so they resolve in the host's namespace. assert os.path.isabs(os.readlink(link_a)) assert os.path.isabs(os.readlink(link_b)) # And they must resolve to the cache files. assert link_a.resolve() == (cache_root / "1001.vpk").resolve() assert link_b.resolve() == (cache_root / "1002.vpk").resolve() def test_workshop_builder_skips_uncached_items_with_warning(env: Path) -> None: _, overlay_id = _create_user_and_overlay("ws", "workshop") cache_root = env / "workshop_cache" cached = _add_workshop_item("1001", downloaded=True, cache_root=cache_root) uncached = _add_workshop_item("9999", downloaded=False, cache_root=cache_root) _associate(overlay_id, cached) _associate(overlay_id, uncached) out, err, on_stdout, on_stderr = _capture_logs() with session_scope() as s: overlay = s.query(Overlay).filter_by(id=overlay_id).one() overlay_builders.BUILDERS["workshop"].build( overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False ) addons = env / "overlays" / "7" / "left4dead2" / "addons" assert (addons / "1001.vpk").is_symlink() assert not (addons / "9999.vpk").exists(), "must NOT create dangling symlink" assert any("9999" in line and ("skip" in line.lower() or "uncached" in line.lower()) for line in err + out), err + out def test_workshop_builder_rerun_is_idempotent(env: Path) -> None: _, overlay_id = _create_user_and_overlay("ws", "workshop") cache_root = env / "workshop_cache" item = _add_workshop_item("1001", downloaded=True, cache_root=cache_root) _associate(overlay_id, item) addons = env / "overlays" / "7" / "left4dead2" / "addons" # First run. with session_scope() as s: overlay = s.query(Overlay).filter_by(id=overlay_id).one() overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=lambda: False) first_inode = (addons / "1001.vpk").lstat().st_ino # Second run — no-op. out, err, on_stdout, on_stderr = _capture_logs() with session_scope() as s: overlay = s.query(Overlay).filter_by(id=overlay_id).one() overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False) second_inode = (addons / "1001.vpk").lstat().st_ino assert first_inode == second_inode, "symlink should be untouched on idempotent rebuild" assert any("unchanged" in line.lower() for line in out), out def test_workshop_builder_removes_obsolete_symlinks(env: Path) -> None: _, overlay_id = _create_user_and_overlay("ws", "workshop") cache_root = env / "workshop_cache" item_a = _add_workshop_item("1001", downloaded=True, cache_root=cache_root) item_b = _add_workshop_item("1002", downloaded=True, cache_root=cache_root) _associate(overlay_id, item_a) _associate(overlay_id, item_b) addons = env / "overlays" / "7" / "left4dead2" / "addons" with session_scope() as s: overlay = s.query(Overlay).filter_by(id=overlay_id).one() overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=lambda: False) assert (addons / "1002.vpk").is_symlink() # Remove the association for 1002. with session_scope() as s: s.query(OverlayWorkshopItem).filter_by(workshop_item_id=item_b).delete() out, err, on_stdout, on_stderr = _capture_logs() with session_scope() as s: overlay = s.query(Overlay).filter_by(id=overlay_id).one() overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False) assert (addons / "1001.vpk").is_symlink() assert not (addons / "1002.vpk").exists() # Cache file must remain — overlays are diff-applied, cache is shared. assert (cache_root / "1002.vpk").exists() def test_workshop_builder_leaves_unrelated_files_alone(env: Path) -> None: _, overlay_id = _create_user_and_overlay("ws", "workshop") cache_root = env / "workshop_cache" item = _add_workshop_item("1001", downloaded=True, cache_root=cache_root) _associate(overlay_id, item) addons = env / "overlays" / "7" / "left4dead2" / "addons" addons.mkdir(parents=True, exist_ok=True) (addons / "manual_addon.vpk").write_bytes(b"hand-placed") with session_scope() as s: overlay = s.query(Overlay).filter_by(id=overlay_id).one() overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=lambda: False) # Manual file is preserved. assert (addons / "manual_addon.vpk").read_bytes() == b"hand-placed" # Workshop symlink is created alongside. assert (addons / "1001.vpk").is_symlink() def test_workshop_builder_honors_should_cancel(env: Path) -> None: _, overlay_id = _create_user_and_overlay("ws", "workshop") cache_root = env / "workshop_cache" items = [_add_workshop_item(f"100{i}", downloaded=True, cache_root=cache_root) for i in range(3)] for it in items: _associate(overlay_id, it) cancel_calls = {"n": 0} def cancel(): cancel_calls["n"] += 1 return cancel_calls["n"] > 0 # cancel immediately with session_scope() as s: overlay = s.query(Overlay).filter_by(id=overlay_id).one() # Should not crash; partial state is consistent (re-run heals). overlay_builders.BUILDERS["workshop"].build( overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=cancel ) # --- ScriptBuilder --------------------------------------------------------- def _script_overlay(*, id_: int = 42, script: str = "echo hi") -> SimpleNamespace: return SimpleNamespace(id=id_, type="script", path=str(id_), script=script) def test_script_builder_invokes_helper(env, monkeypatch) -> None: captured: dict = {} def fake_run(cmd, *, on_stdout, on_stderr, should_cancel): captured["cmd"] = list(cmd) captured["script_text"] = open(cmd[-1]).read() captured["script_path_existed"] = os.path.exists(cmd[-1]) return CommandResult(returncode=0, stdout="", stderr="") monkeypatch.setattr(overlay_builders, "run_command", fake_run) monkeypatch.setattr( overlay_builders.ScriptBuilder, "_enforce_disk_budget", lambda *a, **kw: None ) overlay = _script_overlay() overlay_builders.ScriptBuilder().build( overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=lambda: False, ) assert captured["cmd"][:4] == [ "sudo", "-n", "/usr/local/libexec/left4me/left4me-script-sandbox", "42", ] assert captured["script_text"] == "echo hi" assert captured["script_path_existed"] is True # Tmpfile is unlinked after build. assert not os.path.exists(captured["cmd"][-1]) def test_script_builder_disk_cap(env, monkeypatch) -> None: monkeypatch.setattr( overlay_builders, "run_command", lambda *a, **kw: CommandResult(returncode=0, stdout="", stderr=""), ) monkeypatch.setattr( overlay_builders.subprocess, "check_output", lambda *a, **kw: b"25000000000\t/some/path\n", ) err: list[str] = [] overlay = _script_overlay(script="") with pytest.raises(overlay_builders.BuildError): overlay_builders.ScriptBuilder().build( overlay, on_stdout=lambda _x: None, on_stderr=err.append, should_cancel=lambda: False, ) assert any("20" in line and "GB" in line for line in err), err def test_script_builder_streams_output(env, monkeypatch) -> None: def fake_run(cmd, *, on_stdout, on_stderr, should_cancel): on_stdout("hello") on_stderr("warn") return CommandResult(returncode=0, stdout="hello", stderr="warn") monkeypatch.setattr(overlay_builders, "run_command", fake_run) monkeypatch.setattr( overlay_builders.ScriptBuilder, "_enforce_disk_budget", lambda *a, **kw: None ) out: list[str] = [] err: list[str] = [] overlay = _script_overlay(script="") overlay_builders.ScriptBuilder().build( overlay, on_stdout=out.append, on_stderr=err.append, should_cancel=lambda: False ) assert out == ["hello"] assert err == ["warn"] def test_script_builder_passes_should_cancel_through(env, monkeypatch) -> None: captured: dict = {} def fake_run(cmd, *, on_stdout, on_stderr, should_cancel): captured["should_cancel"] = should_cancel raise CommandCancelledError(returncode=1, cmd=cmd, output="", stderr="") monkeypatch.setattr(overlay_builders, "run_command", fake_run) monkeypatch.setattr( overlay_builders.ScriptBuilder, "_enforce_disk_budget", lambda *a, **kw: None ) overlay = _script_overlay(script="") with pytest.raises(CommandCancelledError): overlay_builders.ScriptBuilder().build( overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=lambda: True, ) assert captured["should_cancel"]() is True def test_script_builder_cleans_up_tmpfile_on_failure(env, monkeypatch) -> None: captured: dict = {} def fake_run(cmd, *, on_stdout, on_stderr, should_cancel): captured["script_path"] = cmd[-1] raise CommandCancelledError(returncode=1, cmd=cmd, output="", stderr="") monkeypatch.setattr(overlay_builders, "run_command", fake_run) overlay = _script_overlay(script="") with pytest.raises(CommandCancelledError): overlay_builders.ScriptBuilder().build( overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=lambda: False, ) assert not os.path.exists(captured["script_path"])