Commit 16adc5c silently dropped two defensive guards from the
symlink-creation loop in WorkshopBuilder.build. Restore them:
- refuse to overwrite a non-symlink file that collides with a workshop
name (logs a message, skips creation)
- refuse to overwrite a foreign symlink (target outside the cache root)
Also: change `skipped` from list to set (O(1) membership test, no
duplicates possible), and add a brief comment above WorkshopMetadata
construction explaining which fields download_to_cache actually uses.
Two regression tests added to pin the guard behaviour.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
758 lines
30 KiB
Python
758 lines
30 KiB
Python
"""Tests for overlay builders (registry, WorkshopBuilder, ScriptBuilder)."""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
|
|
import pytest
|
|
|
|
from l4d2web.db import init_db, session_scope
|
|
from l4d2web.models import Overlay, OverlayWorkshopItem, User, WorkshopItem
|
|
from l4d2web.services import overlay_builders
|
|
from l4d2web.services.host_commands import CommandCancelledError, CommandResult
|
|
|
|
|
|
@pytest.fixture
|
|
def env(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'b.db'}")
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
init_db()
|
|
yield tmp_path
|
|
|
|
|
|
def _create_user_and_overlay(name: str, type_: str) -> tuple[int, int]:
|
|
with session_scope() as s:
|
|
user = User(username="alice", password_digest="x")
|
|
s.add(user)
|
|
s.flush()
|
|
overlay = Overlay(name=name, path=str(7), type=type_, user_id=user.id)
|
|
s.add(overlay)
|
|
s.flush()
|
|
return user.id, overlay.id
|
|
|
|
|
|
def _add_workshop_item(steam_id: str, *, downloaded: bool, cache_root: Path, content: bytes = b"x") -> int:
|
|
if downloaded:
|
|
cache_root.mkdir(parents=True, exist_ok=True)
|
|
(cache_root / f"{steam_id}.vpk").write_bytes(content)
|
|
os.utime(cache_root / f"{steam_id}.vpk", (1700000000, 1700000000))
|
|
with session_scope() as s:
|
|
wi = WorkshopItem(
|
|
steam_id=steam_id,
|
|
title=f"item-{steam_id}",
|
|
filename=f"orig-{steam_id}.vpk",
|
|
file_url=f"https://example.com/{steam_id}.vpk",
|
|
file_size=len(content) if downloaded else 0,
|
|
time_updated=1700000000 if downloaded else 0,
|
|
last_downloaded_at=datetime.now(UTC) if downloaded else None,
|
|
)
|
|
s.add(wi)
|
|
s.flush()
|
|
return wi.id
|
|
|
|
|
|
def _associate(overlay_id: int, item_id: int) -> None:
|
|
with session_scope() as s:
|
|
s.add(OverlayWorkshopItem(overlay_id=overlay_id, workshop_item_id=item_id))
|
|
|
|
|
|
def _capture_logs():
|
|
out: list[str] = []
|
|
err: list[str] = []
|
|
return out, err, out.append, err.append
|
|
|
|
|
|
def test_builders_registry() -> None:
|
|
assert set(overlay_builders.BUILDERS) == {"workshop", "script", "files"}
|
|
|
|
|
|
def test_registry_excludes_legacy_types() -> None:
|
|
for legacy in ("external", "l4d2center_maps", "cedapug_maps"):
|
|
assert legacy not in overlay_builders.BUILDERS
|
|
|
|
|
|
def test_files_builder_is_idempotent_no_op(monkeypatch, tmp_path) -> None:
|
|
"""Files builder ensures the overlay directory exists. Running twice
|
|
against an already-populated overlay must not clobber its contents."""
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
overlay = type("O", (), {"id": 42, "name": "files-fixture"})()
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
|
|
overlay_builders.BUILDERS["files"].build(
|
|
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False
|
|
)
|
|
|
|
overlay_dir = tmp_path / "overlays" / "42"
|
|
assert overlay_dir.is_dir()
|
|
(overlay_dir / "kept.txt").write_text("preserved")
|
|
|
|
overlay_builders.BUILDERS["files"].build(
|
|
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False
|
|
)
|
|
|
|
assert (overlay_dir / "kept.txt").read_text() == "preserved"
|
|
assert err == []
|
|
|
|
|
|
def test_registry_unknown_type_raises_keyerror() -> None:
|
|
with pytest.raises(KeyError):
|
|
overlay_builders.BUILDERS["nope"]
|
|
|
|
|
|
def test_workshop_builder_unchanged() -> None:
|
|
"""Regression guard against accidental removal during refactor."""
|
|
builder = overlay_builders.BUILDERS["workshop"]
|
|
assert isinstance(builder, overlay_builders.WorkshopBuilder)
|
|
assert hasattr(builder, "build")
|
|
|
|
|
|
def test_workshop_builder_creates_absolute_symlinks(env: Path) -> None:
|
|
_, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
cache_root = env / "workshop_cache"
|
|
item_a = _add_workshop_item("1001", downloaded=True, cache_root=cache_root, content=b"AAA")
|
|
item_b = _add_workshop_item("1002", downloaded=True, cache_root=cache_root, content=b"BBBB")
|
|
_associate(overlay_id, item_a)
|
|
_associate(overlay_id, item_b)
|
|
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with session_scope() as s:
|
|
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
|
|
overlay_builders.BUILDERS["workshop"].build(
|
|
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False
|
|
)
|
|
|
|
addons = env / "overlays" / "7" / "left4dead2" / "addons"
|
|
link_a = addons / "1001.vpk"
|
|
link_b = addons / "1002.vpk"
|
|
assert link_a.is_symlink()
|
|
assert link_b.is_symlink()
|
|
# Targets must be ABSOLUTE so they resolve in the host's namespace.
|
|
assert os.path.isabs(os.readlink(link_a))
|
|
assert os.path.isabs(os.readlink(link_b))
|
|
# And they must resolve to the cache files.
|
|
assert link_a.resolve() == (cache_root / "1001.vpk").resolve()
|
|
assert link_b.resolve() == (cache_root / "1002.vpk").resolve()
|
|
|
|
|
|
|
|
def test_workshop_builder_rerun_is_idempotent(env: Path) -> None:
|
|
_, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
cache_root = env / "workshop_cache"
|
|
item = _add_workshop_item("1001", downloaded=True, cache_root=cache_root)
|
|
_associate(overlay_id, item)
|
|
|
|
addons = env / "overlays" / "7" / "left4dead2" / "addons"
|
|
|
|
# First run.
|
|
with session_scope() as s:
|
|
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
|
|
overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=lambda: False)
|
|
first_inode = (addons / "1001.vpk").lstat().st_ino
|
|
|
|
# Second run — no-op.
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with session_scope() as s:
|
|
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
|
|
overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False)
|
|
|
|
second_inode = (addons / "1001.vpk").lstat().st_ino
|
|
assert first_inode == second_inode, "symlink should be untouched on idempotent rebuild"
|
|
assert any("unchanged" in line.lower() for line in out), out
|
|
|
|
|
|
def test_workshop_builder_removes_obsolete_symlinks(env: Path) -> None:
|
|
_, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
cache_root = env / "workshop_cache"
|
|
item_a = _add_workshop_item("1001", downloaded=True, cache_root=cache_root)
|
|
item_b = _add_workshop_item("1002", downloaded=True, cache_root=cache_root)
|
|
_associate(overlay_id, item_a)
|
|
_associate(overlay_id, item_b)
|
|
|
|
addons = env / "overlays" / "7" / "left4dead2" / "addons"
|
|
with session_scope() as s:
|
|
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
|
|
overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=lambda: False)
|
|
assert (addons / "1002.vpk").is_symlink()
|
|
|
|
# Remove the association for 1002.
|
|
with session_scope() as s:
|
|
s.query(OverlayWorkshopItem).filter_by(workshop_item_id=item_b).delete()
|
|
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with session_scope() as s:
|
|
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
|
|
overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False)
|
|
|
|
assert (addons / "1001.vpk").is_symlink()
|
|
assert not (addons / "1002.vpk").exists()
|
|
# Cache file must remain — overlays are diff-applied, cache is shared.
|
|
assert (cache_root / "1002.vpk").exists()
|
|
|
|
|
|
def test_workshop_builder_leaves_unrelated_files_alone(env: Path) -> None:
|
|
_, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
cache_root = env / "workshop_cache"
|
|
item = _add_workshop_item("1001", downloaded=True, cache_root=cache_root)
|
|
_associate(overlay_id, item)
|
|
|
|
addons = env / "overlays" / "7" / "left4dead2" / "addons"
|
|
addons.mkdir(parents=True, exist_ok=True)
|
|
(addons / "manual_addon.vpk").write_bytes(b"hand-placed")
|
|
|
|
with session_scope() as s:
|
|
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
|
|
overlay_builders.BUILDERS["workshop"].build(overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=lambda: False)
|
|
|
|
# Manual file is preserved.
|
|
assert (addons / "manual_addon.vpk").read_bytes() == b"hand-placed"
|
|
# Workshop symlink is created alongside.
|
|
assert (addons / "1001.vpk").is_symlink()
|
|
|
|
|
|
def test_workshop_builder_honors_should_cancel(env: Path) -> None:
|
|
_, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
cache_root = env / "workshop_cache"
|
|
items = [_add_workshop_item(f"100{i}", downloaded=True, cache_root=cache_root) for i in range(3)]
|
|
for it in items:
|
|
_associate(overlay_id, it)
|
|
|
|
cancel_calls = {"n": 0}
|
|
|
|
def cancel():
|
|
cancel_calls["n"] += 1
|
|
return cancel_calls["n"] > 0 # cancel immediately
|
|
|
|
with session_scope() as s:
|
|
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
|
|
# Should not crash; partial state is consistent (re-run heals).
|
|
overlay_builders.BUILDERS["workshop"].build(
|
|
overlay, on_stdout=lambda _x: None, on_stderr=lambda _x: None, should_cancel=cancel
|
|
)
|
|
|
|
|
|
# --- ScriptBuilder ---------------------------------------------------------
|
|
|
|
def _script_overlay(*, id_: int = 42, script: str = "echo hi") -> SimpleNamespace:
|
|
return SimpleNamespace(id=id_, type="script", path=str(id_), script=script)
|
|
|
|
|
|
def test_script_builder_invokes_helper(env, monkeypatch) -> None:
|
|
captured: dict = {}
|
|
|
|
def fake_run(cmd, *, on_stdout, on_stderr, should_cancel):
|
|
captured["cmd"] = list(cmd)
|
|
captured["script_text"] = open(cmd[-1]).read()
|
|
captured["script_path_existed"] = os.path.exists(cmd[-1])
|
|
return CommandResult(returncode=0, stdout="", stderr="")
|
|
|
|
monkeypatch.setattr(overlay_builders, "run_command", fake_run)
|
|
monkeypatch.setattr(
|
|
overlay_builders.ScriptBuilder, "_enforce_disk_budget", lambda *a, **kw: None
|
|
)
|
|
|
|
overlay = _script_overlay()
|
|
overlay_builders.ScriptBuilder().build(
|
|
overlay,
|
|
on_stdout=lambda _x: None,
|
|
on_stderr=lambda _x: None,
|
|
should_cancel=lambda: False,
|
|
)
|
|
|
|
assert captured["cmd"][:4] == [
|
|
"sudo",
|
|
"-n",
|
|
"/usr/local/libexec/left4me/left4me-script-sandbox",
|
|
"42",
|
|
]
|
|
assert captured["script_text"] == "echo hi"
|
|
assert captured["script_path_existed"] is True
|
|
# Tmpfile is unlinked after build.
|
|
assert not os.path.exists(captured["cmd"][-1])
|
|
|
|
|
|
def test_script_builder_disk_cap(env, monkeypatch) -> None:
|
|
monkeypatch.setattr(
|
|
overlay_builders,
|
|
"run_command",
|
|
lambda *a, **kw: CommandResult(returncode=0, stdout="", stderr=""),
|
|
)
|
|
monkeypatch.setattr(
|
|
overlay_builders.subprocess,
|
|
"check_output",
|
|
lambda *a, **kw: b"25000000000\t/some/path\n",
|
|
)
|
|
|
|
err: list[str] = []
|
|
overlay = _script_overlay(script="")
|
|
|
|
with pytest.raises(overlay_builders.BuildError):
|
|
overlay_builders.ScriptBuilder().build(
|
|
overlay,
|
|
on_stdout=lambda _x: None,
|
|
on_stderr=err.append,
|
|
should_cancel=lambda: False,
|
|
)
|
|
|
|
assert any("20" in line and "GB" in line for line in err), err
|
|
|
|
|
|
def test_script_builder_streams_output(env, monkeypatch) -> None:
|
|
def fake_run(cmd, *, on_stdout, on_stderr, should_cancel):
|
|
on_stdout("hello")
|
|
on_stderr("warn")
|
|
return CommandResult(returncode=0, stdout="hello", stderr="warn")
|
|
|
|
monkeypatch.setattr(overlay_builders, "run_command", fake_run)
|
|
monkeypatch.setattr(
|
|
overlay_builders.ScriptBuilder, "_enforce_disk_budget", lambda *a, **kw: None
|
|
)
|
|
|
|
out: list[str] = []
|
|
err: list[str] = []
|
|
overlay = _script_overlay(script="")
|
|
overlay_builders.ScriptBuilder().build(
|
|
overlay, on_stdout=out.append, on_stderr=err.append, should_cancel=lambda: False
|
|
)
|
|
assert out == ["hello"]
|
|
assert err == ["warn"]
|
|
|
|
|
|
def test_script_builder_passes_should_cancel_through(env, monkeypatch) -> None:
|
|
captured: dict = {}
|
|
|
|
def fake_run(cmd, *, on_stdout, on_stderr, should_cancel):
|
|
captured["should_cancel"] = should_cancel
|
|
raise CommandCancelledError(returncode=1, cmd=cmd, output="", stderr="")
|
|
|
|
monkeypatch.setattr(overlay_builders, "run_command", fake_run)
|
|
monkeypatch.setattr(
|
|
overlay_builders.ScriptBuilder, "_enforce_disk_budget", lambda *a, **kw: None
|
|
)
|
|
|
|
overlay = _script_overlay(script="")
|
|
with pytest.raises(CommandCancelledError):
|
|
overlay_builders.ScriptBuilder().build(
|
|
overlay,
|
|
on_stdout=lambda _x: None,
|
|
on_stderr=lambda _x: None,
|
|
should_cancel=lambda: True,
|
|
)
|
|
assert captured["should_cancel"]() is True
|
|
|
|
|
|
def test_script_builder_cleans_up_tmpfile_on_failure(env, monkeypatch) -> None:
|
|
captured: dict = {}
|
|
|
|
def fake_run(cmd, *, on_stdout, on_stderr, should_cancel):
|
|
captured["script_path"] = cmd[-1]
|
|
raise CommandCancelledError(returncode=1, cmd=cmd, output="", stderr="")
|
|
|
|
monkeypatch.setattr(overlay_builders, "run_command", fake_run)
|
|
|
|
overlay = _script_overlay(script="")
|
|
with pytest.raises(CommandCancelledError):
|
|
overlay_builders.ScriptBuilder().build(
|
|
overlay,
|
|
on_stdout=lambda _x: None,
|
|
on_stderr=lambda _x: None,
|
|
should_cancel=lambda: False,
|
|
)
|
|
assert not os.path.exists(captured["script_path"])
|
|
|
|
|
|
def test_sleep_with_cancel_returns_normally_when_not_cancelled():
|
|
from l4d2web.services.overlay_builders import _sleep_with_cancel
|
|
|
|
cancelled = _sleep_with_cancel(0.05, lambda: False, poll_interval=0.01)
|
|
assert cancelled is False
|
|
|
|
|
|
def test_sleep_with_cancel_returns_early_when_cancelled():
|
|
import time
|
|
from l4d2web.services.overlay_builders import _sleep_with_cancel
|
|
|
|
flag = {"cancel": False}
|
|
def cancel_check():
|
|
return flag["cancel"]
|
|
|
|
import threading
|
|
threading.Timer(0.05, lambda: flag.update(cancel=True)).start()
|
|
|
|
start = time.monotonic()
|
|
cancelled = _sleep_with_cancel(5.0, cancel_check, poll_interval=0.01)
|
|
elapsed = time.monotonic() - start
|
|
|
|
assert cancelled is True
|
|
assert elapsed < 0.5, f"should have woken up promptly, slept {elapsed:.3f}s"
|
|
|
|
|
|
def test_download_with_retry_succeeds_on_first_attempt(env, tmp_path, monkeypatch):
|
|
from l4d2web.services import overlay_builders, steam_workshop
|
|
|
|
monkeypatch.setattr(overlay_builders, "_sleep_with_cancel", lambda *a, **kw: False)
|
|
calls = []
|
|
def fake_download(meta, cache_root, *, should_cancel=None):
|
|
calls.append(1)
|
|
return cache_root / f"{meta.steam_id}.vpk"
|
|
monkeypatch.setattr(overlay_builders, "download_to_cache", fake_download)
|
|
|
|
meta = steam_workshop.WorkshopMetadata(
|
|
steam_id="1001", title="x", filename="x.vpk", file_url="http://e/x.vpk",
|
|
file_size=1, time_updated=1700000000, preview_url="", consumer_app_id=550, result=1,
|
|
)
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
overlay_builders._download_with_retry(
|
|
meta, tmp_path / "cache",
|
|
on_stderr=on_stderr, should_cancel=lambda: False,
|
|
)
|
|
assert calls == [1]
|
|
assert err == []
|
|
|
|
|
|
def test_download_with_retry_retries_then_succeeds(env, tmp_path, monkeypatch):
|
|
import requests
|
|
from l4d2web.services import overlay_builders, steam_workshop
|
|
|
|
monkeypatch.setattr(overlay_builders, "_sleep_with_cancel", lambda *a, **kw: False)
|
|
attempts = {"n": 0}
|
|
def fake_download(meta, cache_root, *, should_cancel=None):
|
|
attempts["n"] += 1
|
|
if attempts["n"] < 3:
|
|
raise requests.ConnectionError("boom")
|
|
monkeypatch.setattr(overlay_builders, "download_to_cache", fake_download)
|
|
|
|
meta = steam_workshop.WorkshopMetadata(
|
|
steam_id="1001", title="x", filename="x.vpk", file_url="http://e/x.vpk",
|
|
file_size=1, time_updated=1700000000, preview_url="", consumer_app_id=550, result=1,
|
|
)
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
overlay_builders._download_with_retry(
|
|
meta, tmp_path / "cache",
|
|
on_stderr=on_stderr, should_cancel=lambda: False,
|
|
)
|
|
assert attempts["n"] == 3
|
|
assert sum(1 for line in err if "attempt" in line and "failed" in line) == 2
|
|
|
|
|
|
def test_download_with_retry_exhausts_and_raises(env, tmp_path, monkeypatch):
|
|
import requests
|
|
from l4d2web.services import overlay_builders, steam_workshop
|
|
|
|
monkeypatch.setattr(overlay_builders, "_sleep_with_cancel", lambda *a, **kw: False)
|
|
def fake_download(meta, cache_root, *, should_cancel=None):
|
|
raise requests.ConnectionError("permanent")
|
|
monkeypatch.setattr(overlay_builders, "download_to_cache", fake_download)
|
|
|
|
meta = steam_workshop.WorkshopMetadata(
|
|
steam_id="1001", title="x", filename="x.vpk", file_url="http://e/x.vpk",
|
|
file_size=1, time_updated=1700000000, preview_url="", consumer_app_id=550, result=1,
|
|
)
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with pytest.raises(requests.ConnectionError):
|
|
overlay_builders._download_with_retry(
|
|
meta, tmp_path / "cache",
|
|
on_stderr=on_stderr, should_cancel=lambda: False,
|
|
)
|
|
# Two stderr "attempt N/3 failed" lines for attempts 1 and 2; the final
|
|
# attempt re-raises without logging.
|
|
assert sum(1 for line in err if "attempt" in line and "failed" in line) == 2
|
|
|
|
|
|
def test_download_with_retry_propagates_interrupted(env, tmp_path, monkeypatch):
|
|
from l4d2web.services import overlay_builders, steam_workshop
|
|
|
|
def fake_download(meta, cache_root, *, should_cancel=None):
|
|
raise InterruptedError("cancelled")
|
|
monkeypatch.setattr(overlay_builders, "download_to_cache", fake_download)
|
|
|
|
meta = steam_workshop.WorkshopMetadata(
|
|
steam_id="1001", title="x", filename="x.vpk", file_url="http://e/x.vpk",
|
|
file_size=1, time_updated=1700000000, preview_url="", consumer_app_id=550, result=1,
|
|
)
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with pytest.raises(InterruptedError):
|
|
overlay_builders._download_with_retry(
|
|
meta, tmp_path / "cache",
|
|
on_stderr=on_stderr, should_cancel=lambda: False,
|
|
)
|
|
|
|
|
|
def test_download_with_retry_bails_when_cancelled_during_backoff(env, tmp_path, monkeypatch):
|
|
import requests
|
|
from l4d2web.services import overlay_builders, steam_workshop
|
|
|
|
def fake_download(meta, cache_root, *, should_cancel=None):
|
|
raise requests.ConnectionError("boom")
|
|
monkeypatch.setattr(overlay_builders, "download_to_cache", fake_download)
|
|
monkeypatch.setattr(overlay_builders, "_sleep_with_cancel", lambda *a, **kw: True)
|
|
|
|
meta = steam_workshop.WorkshopMetadata(
|
|
steam_id="1001", title="x", filename="x.vpk", file_url="http://e/x.vpk",
|
|
file_size=1, time_updated=1700000000, preview_url="", consumer_app_id=550, result=1,
|
|
)
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with pytest.raises(InterruptedError):
|
|
overlay_builders._download_with_retry(
|
|
meta, tmp_path / "cache",
|
|
on_stderr=on_stderr, should_cancel=lambda: False,
|
|
)
|
|
|
|
|
|
def _make_meta_from_db_row(steam_id: str, *, file_size: int, time_updated: int):
|
|
from l4d2web.services import steam_workshop
|
|
return steam_workshop.WorkshopMetadata(
|
|
steam_id=steam_id, title=f"item-{steam_id}", filename=f"orig-{steam_id}.vpk",
|
|
file_url=f"https://example.com/{steam_id}.vpk", file_size=file_size,
|
|
time_updated=time_updated, preview_url="", consumer_app_id=550, result=1,
|
|
)
|
|
|
|
|
|
def test_workshop_build_downloads_uncached_and_stamps_timestamp(env, tmp_path, monkeypatch):
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
cache_root = tmp_path / "workshop_cache"
|
|
user_id, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
item_id = _add_workshop_item("2001", downloaded=False, cache_root=cache_root)
|
|
_associate(overlay_id, item_id)
|
|
|
|
download_calls = []
|
|
def fake_download(meta, cache_root_arg, *, should_cancel=None):
|
|
download_calls.append(meta.steam_id)
|
|
cache_root_arg.mkdir(parents=True, exist_ok=True)
|
|
(cache_root_arg / f"{meta.steam_id}.vpk").write_bytes(b"data")
|
|
os.utime(cache_root_arg / f"{meta.steam_id}.vpk", (meta.time_updated, meta.time_updated))
|
|
|
|
monkeypatch.setattr(overlay_builders, "download_to_cache", fake_download)
|
|
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with session_scope() as s:
|
|
overlay = s.scalar(__import__("sqlalchemy").select(Overlay).where(Overlay.id == overlay_id))
|
|
s.expunge(overlay)
|
|
overlay_builders.WorkshopBuilder().build(
|
|
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False,
|
|
)
|
|
|
|
assert download_calls == ["2001"]
|
|
with session_scope() as s:
|
|
from sqlalchemy import select as _select
|
|
wi = s.scalar(_select(WorkshopItem).where(WorkshopItem.id == item_id))
|
|
assert wi.last_downloaded_at is not None
|
|
assert wi.last_error == ""
|
|
|
|
addons = tmp_path / "overlays" / "7" / "left4dead2" / "addons"
|
|
assert (addons / "2001.vpk").is_symlink()
|
|
|
|
|
|
def test_workshop_build_skips_already_cached(env, tmp_path, monkeypatch):
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
cache_root = tmp_path / "workshop_cache"
|
|
user_id, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
item_id = _add_workshop_item("2002", downloaded=True, cache_root=cache_root)
|
|
# Make the cache file's (mtime, size) match the DB row exactly.
|
|
file_path = cache_root / "2002.vpk"
|
|
os.utime(file_path, (1700000000, 1700000000))
|
|
with session_scope() as s:
|
|
from sqlalchemy import select as _sel, update as _upd
|
|
s.execute(_upd(WorkshopItem).where(WorkshopItem.id == item_id).values(
|
|
file_size=os.path.getsize(file_path), time_updated=1700000000,
|
|
))
|
|
_associate(overlay_id, item_id)
|
|
|
|
called = []
|
|
monkeypatch.setattr(
|
|
overlay_builders, "download_to_cache",
|
|
lambda *a, **kw: called.append(1),
|
|
)
|
|
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with session_scope() as s:
|
|
from sqlalchemy import select as _sel
|
|
overlay = s.scalar(_sel(Overlay).where(Overlay.id == overlay_id))
|
|
s.expunge(overlay)
|
|
overlay_builders.WorkshopBuilder().build(
|
|
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False,
|
|
)
|
|
|
|
assert called == [], "should not call downloader for an already-cached item"
|
|
addons = tmp_path / "overlays" / "7" / "left4dead2" / "addons"
|
|
assert (addons / "2002.vpk").is_symlink()
|
|
|
|
|
|
def test_workshop_build_redownloads_stale_cache(env, tmp_path, monkeypatch):
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
cache_root = tmp_path / "workshop_cache"
|
|
user_id, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
item_id = _add_workshop_item("2003", downloaded=True, cache_root=cache_root)
|
|
with session_scope() as s:
|
|
from sqlalchemy import update as _upd
|
|
s.execute(_upd(WorkshopItem).where(WorkshopItem.id == item_id).values(
|
|
file_size=99, time_updated=1800000000,
|
|
))
|
|
_associate(overlay_id, item_id)
|
|
|
|
download_calls = []
|
|
def fake_download(meta, cache_root_arg, *, should_cancel=None):
|
|
download_calls.append(meta.steam_id)
|
|
(cache_root_arg / f"{meta.steam_id}.vpk").write_bytes(b"99bytes____99bytes____99bytes____99bytes____99bytes____99bytes____99bytes____99bytes____99bytes____")
|
|
os.utime(cache_root_arg / f"{meta.steam_id}.vpk", (meta.time_updated, meta.time_updated))
|
|
monkeypatch.setattr(overlay_builders, "download_to_cache", fake_download)
|
|
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with session_scope() as s:
|
|
from sqlalchemy import select as _sel
|
|
overlay = s.scalar(_sel(Overlay).where(Overlay.id == overlay_id))
|
|
s.expunge(overlay)
|
|
overlay_builders.WorkshopBuilder().build(
|
|
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False,
|
|
)
|
|
|
|
assert download_calls == ["2003"]
|
|
|
|
|
|
def test_workshop_build_skips_items_with_no_file_url(env, tmp_path, monkeypatch):
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
user_id, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
with session_scope() as s:
|
|
wi = WorkshopItem(
|
|
steam_id="2004", title="gone", filename="",
|
|
file_url="", file_size=0, time_updated=0, preview_url="",
|
|
last_downloaded_at=None, last_error="steam result 9",
|
|
)
|
|
s.add(wi)
|
|
s.flush()
|
|
item_id = wi.id
|
|
_associate(overlay_id, item_id)
|
|
|
|
monkeypatch.setattr(
|
|
overlay_builders, "download_to_cache",
|
|
lambda *a, **kw: (_ for _ in ()).throw(AssertionError("must not be called")),
|
|
)
|
|
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with session_scope() as s:
|
|
from sqlalchemy import select as _sel
|
|
overlay = s.scalar(_sel(Overlay).where(Overlay.id == overlay_id))
|
|
s.expunge(overlay)
|
|
overlay_builders.WorkshopBuilder().build(
|
|
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False,
|
|
)
|
|
|
|
assert any("2004" in line and "skipped" in line for line in err)
|
|
addons = tmp_path / "overlays" / "7" / "left4dead2" / "addons"
|
|
assert not (addons / "2004.vpk").exists()
|
|
|
|
|
|
def test_workshop_build_fails_when_all_retries_exhausted(env, tmp_path, monkeypatch):
|
|
import requests
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
user_id, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
item_id = _add_workshop_item("2005", downloaded=False, cache_root=tmp_path / "workshop_cache")
|
|
_associate(overlay_id, item_id)
|
|
|
|
monkeypatch.setattr(overlay_builders, "_sleep_with_cancel", lambda *a, **kw: False)
|
|
monkeypatch.setattr(
|
|
overlay_builders, "download_to_cache",
|
|
lambda *a, **kw: (_ for _ in ()).throw(requests.ConnectionError("net")),
|
|
)
|
|
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with session_scope() as s:
|
|
from sqlalchemy import select as _sel
|
|
overlay = s.scalar(_sel(Overlay).where(Overlay.id == overlay_id))
|
|
s.expunge(overlay)
|
|
with pytest.raises(requests.ConnectionError):
|
|
overlay_builders.WorkshopBuilder().build(
|
|
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False,
|
|
)
|
|
with session_scope() as s:
|
|
from sqlalchemy import select as _sel
|
|
wi = s.scalar(_sel(WorkshopItem).where(WorkshopItem.id == item_id))
|
|
assert "download failed" in wi.last_error
|
|
|
|
|
|
def test_workshop_build_cancels_cleanly_during_download_phase(env, tmp_path, monkeypatch):
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
user_id, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
item_id = _add_workshop_item("2006", downloaded=False, cache_root=tmp_path / "workshop_cache")
|
|
_associate(overlay_id, item_id)
|
|
|
|
cancel_flag = {"v": False}
|
|
def fake_download(meta, cache_root, *, should_cancel=None):
|
|
cancel_flag["v"] = True
|
|
raise InterruptedError("cancelled")
|
|
monkeypatch.setattr(overlay_builders, "download_to_cache", fake_download)
|
|
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with session_scope() as s:
|
|
from sqlalchemy import select as _sel
|
|
overlay = s.scalar(_sel(Overlay).where(Overlay.id == overlay_id))
|
|
s.expunge(overlay)
|
|
with pytest.raises(InterruptedError):
|
|
overlay_builders.WorkshopBuilder().build(
|
|
overlay, on_stdout=on_stdout, on_stderr=on_stderr,
|
|
should_cancel=lambda: cancel_flag["v"],
|
|
)
|
|
|
|
|
|
def test_workshop_build_refuses_to_overwrite_non_symlink_file(env, tmp_path, monkeypatch):
|
|
"""If a plain file collides with a workshop symlink name, the build logs
|
|
a refusal and leaves the file alone instead of crashing."""
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
cache_root = tmp_path / "workshop_cache"
|
|
user_id, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
item_id = _add_workshop_item("3001", downloaded=True, cache_root=cache_root)
|
|
_associate(overlay_id, item_id)
|
|
|
|
# Pre-create a plain file (not a symlink) where the builder would place its symlink.
|
|
addons = tmp_path / "overlays" / "7" / "left4dead2" / "addons"
|
|
addons.mkdir(parents=True, exist_ok=True)
|
|
(addons / "3001.vpk").write_bytes(b"manual file, don't touch")
|
|
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with session_scope() as s:
|
|
from sqlalchemy import select as _sel
|
|
overlay = s.scalar(_sel(Overlay).where(Overlay.id == overlay_id))
|
|
s.expunge(overlay)
|
|
overlay_builders.WorkshopBuilder().build(
|
|
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False,
|
|
)
|
|
|
|
# The plain file is still there, unchanged.
|
|
assert (addons / "3001.vpk").exists()
|
|
assert not (addons / "3001.vpk").is_symlink()
|
|
assert (addons / "3001.vpk").read_bytes() == b"manual file, don't touch"
|
|
# And a refusal message was logged.
|
|
assert any("refusing to overwrite non-symlink" in line for line in err)
|
|
|
|
|
|
def test_workshop_build_refuses_to_overwrite_foreign_symlink(env, tmp_path, monkeypatch):
|
|
"""A symlink pointing outside the workshop cache is left alone — not
|
|
overwritten, not failed."""
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
cache_root = tmp_path / "workshop_cache"
|
|
user_id, overlay_id = _create_user_and_overlay("ws", "workshop")
|
|
item_id = _add_workshop_item("3002", downloaded=True, cache_root=cache_root)
|
|
_associate(overlay_id, item_id)
|
|
|
|
# Pre-create a symlink pointing outside the cache root.
|
|
foreign_target = tmp_path / "elsewhere" / "thing.vpk"
|
|
foreign_target.parent.mkdir(parents=True, exist_ok=True)
|
|
foreign_target.write_bytes(b"some other vpk")
|
|
addons = tmp_path / "overlays" / "7" / "left4dead2" / "addons"
|
|
addons.mkdir(parents=True, exist_ok=True)
|
|
os.symlink(foreign_target, addons / "3002.vpk")
|
|
|
|
out, err, on_stdout, on_stderr = _capture_logs()
|
|
with session_scope() as s:
|
|
from sqlalchemy import select as _sel
|
|
overlay = s.scalar(_sel(Overlay).where(Overlay.id == overlay_id))
|
|
s.expunge(overlay)
|
|
overlay_builders.WorkshopBuilder().build(
|
|
overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=lambda: False,
|
|
)
|
|
|
|
# The foreign symlink still points where it did.
|
|
assert (addons / "3002.vpk").is_symlink()
|
|
assert os.readlink(addons / "3002.vpk") == str(foreign_target)
|
|
assert any("refusing to overwrite foreign symlink" in line for line in err)
|