feat(l4d2-web): steam workshop API client and downloader
Adds l4d2web/services/steam_workshop.py: parse_workshop_input (single ID, URL, or multi-line batch), resolve_collection (HTTPS POST to GetCollectionDetails), fetch_metadata_batch (HTTPS POST to GetPublishedFileDetails with consumer_app_id == 550 enforcement that raises WorkshopValidationError in add-mode and silently skips in refresh-mode), download_to_cache (atomic + idempotent on mtime+size), and refresh_all (ThreadPoolExecutor with per-item error collection). Adds requests as an explicit dependency. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
2543a05c12
commit
c6b41429ee
3 changed files with 608 additions and 0 deletions
|
|
@ -14,6 +14,7 @@ dependencies = [
|
||||||
"alembic>=1.13",
|
"alembic>=1.13",
|
||||||
"PyYAML>=6.0",
|
"PyYAML>=6.0",
|
||||||
"gunicorn>=22.0",
|
"gunicorn>=22.0",
|
||||||
|
"requests>=2.31",
|
||||||
]
|
]
|
||||||
|
|
||||||
[tool.setuptools]
|
[tool.setuptools]
|
||||||
|
|
|
||||||
295
l4d2web/services/steam_workshop.py
Normal file
295
l4d2web/services/steam_workshop.py
Normal file
|
|
@ -0,0 +1,295 @@
|
||||||
|
"""Steam Workshop API client + downloader.
|
||||||
|
|
||||||
|
Pure HTTP/file logic — no DB writes, no Flask, no job-worker integration.
|
||||||
|
Used by the workshop overlay builder and the admin refresh job.
|
||||||
|
|
||||||
|
Endpoints:
|
||||||
|
- GetCollectionDetails: resolve a collection ID to its child item IDs.
|
||||||
|
- GetPublishedFileDetails: batch-fetch metadata for items, including a public
|
||||||
|
file_url for the .vpk.
|
||||||
|
|
||||||
|
Both endpoints accept anonymous POSTs; no Steam Web API key required.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import threading
|
||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Callable, Iterable, Literal
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
|
||||||
|
# HTTPS only (decision 16). The reference downloader uses HTTP — we don't.
|
||||||
|
GET_PUBLISHED_FILE_DETAILS_URL = (
|
||||||
|
"https://api.steampowered.com/ISteamRemoteStorage/GetPublishedFileDetails/v1/"
|
||||||
|
)
|
||||||
|
GET_COLLECTION_DETAILS_URL = (
|
||||||
|
"https://api.steampowered.com/ISteamRemoteStorage/GetCollectionDetails/v1/"
|
||||||
|
)
|
||||||
|
|
||||||
|
L4D2_APP_ID = 550
|
||||||
|
|
||||||
|
REQUEST_TIMEOUT_SECONDS = 30
|
||||||
|
DOWNLOAD_CHUNK_BYTES = 1_048_576
|
||||||
|
|
||||||
|
_NUMERIC_ID_RE = re.compile(r"^\d+$")
|
||||||
|
_URL_ID_RE = re.compile(r"^https?://([a-z0-9.-]*\.)?steamcommunity\.com/.*[?&]id=(\d+)", re.IGNORECASE)
|
||||||
|
_BARE_URL_ID_RE = re.compile(r"^([a-z0-9.-]*\.)?steamcommunity\.com/.*[?&]id=(\d+)", re.IGNORECASE)
|
||||||
|
|
||||||
|
_session_local = threading.local()
|
||||||
|
|
||||||
|
|
||||||
|
def _session() -> requests.Session:
|
||||||
|
"""Per-thread session for connection reuse without cross-thread leakage."""
|
||||||
|
sess = getattr(_session_local, "session", None)
|
||||||
|
if sess is None:
|
||||||
|
sess = requests.Session()
|
||||||
|
_session_local.session = sess
|
||||||
|
return sess
|
||||||
|
|
||||||
|
|
||||||
|
class WorkshopValidationError(ValueError):
|
||||||
|
"""Raised during user-add when an item fails a fixed precondition
|
||||||
|
(e.g. consumer_app_id != 550)."""
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(slots=True)
|
||||||
|
class WorkshopMetadata:
|
||||||
|
steam_id: str
|
||||||
|
title: str
|
||||||
|
filename: str
|
||||||
|
file_url: str
|
||||||
|
file_size: int
|
||||||
|
time_updated: int
|
||||||
|
preview_url: str
|
||||||
|
consumer_app_id: int
|
||||||
|
result: int
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(slots=True)
|
||||||
|
class RefreshReport:
|
||||||
|
downloaded: int = 0
|
||||||
|
skipped: int = 0
|
||||||
|
errors: int = 0
|
||||||
|
per_item_errors: dict[str, str] = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_workshop_input(raw: str) -> list[str]:
|
||||||
|
"""Parse a single ID, a single workshop URL, or a multi-line / whitespace-
|
||||||
|
separated batch of either. Returns deduplicated digit-only IDs in order.
|
||||||
|
Raises ValueError on garbage."""
|
||||||
|
if not raw or not raw.strip():
|
||||||
|
raise ValueError("input is empty")
|
||||||
|
|
||||||
|
tokens: list[str] = []
|
||||||
|
for token in re.split(r"\s+", raw.strip()):
|
||||||
|
if not token:
|
||||||
|
continue
|
||||||
|
tokens.append(_extract_id(token))
|
||||||
|
|
||||||
|
seen: set[str] = set()
|
||||||
|
deduped: list[str] = []
|
||||||
|
for tok in tokens:
|
||||||
|
if tok not in seen:
|
||||||
|
seen.add(tok)
|
||||||
|
deduped.append(tok)
|
||||||
|
return deduped
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_id(token: str) -> str:
|
||||||
|
if _NUMERIC_ID_RE.fullmatch(token):
|
||||||
|
return token
|
||||||
|
m = _URL_ID_RE.match(token)
|
||||||
|
if m:
|
||||||
|
return m.group(2)
|
||||||
|
m = _BARE_URL_ID_RE.match(token)
|
||||||
|
if m:
|
||||||
|
return m.group(2)
|
||||||
|
raise ValueError(f"could not parse a Steam workshop id from: {token!r}")
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_collection(collection_id: str) -> list[str]:
|
||||||
|
"""POST GetCollectionDetails for one collection; return its non-collection
|
||||||
|
child publishedfileids in order. Nested collections (filetype != 0) are
|
||||||
|
skipped."""
|
||||||
|
if not _NUMERIC_ID_RE.fullmatch(collection_id):
|
||||||
|
raise ValueError("collection_id must be digits only")
|
||||||
|
|
||||||
|
response = _session().post(
|
||||||
|
GET_COLLECTION_DETAILS_URL,
|
||||||
|
data={
|
||||||
|
"collectioncount": 1,
|
||||||
|
"publishedfileids[0]": collection_id,
|
||||||
|
},
|
||||||
|
timeout=REQUEST_TIMEOUT_SECONDS,
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
payload = response.json()
|
||||||
|
children: list[str] = []
|
||||||
|
for collection in payload.get("response", {}).get("collectiondetails", []):
|
||||||
|
for child in collection.get("children", []):
|
||||||
|
if child.get("filetype", 0) != 0:
|
||||||
|
continue # nested collection, skip
|
||||||
|
child_id = child.get("publishedfileid")
|
||||||
|
if child_id is not None:
|
||||||
|
children.append(str(child_id))
|
||||||
|
return children
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_metadata_batch(
|
||||||
|
steam_ids: list[str], *, mode: Literal["add", "refresh"]
|
||||||
|
) -> list[WorkshopMetadata]:
|
||||||
|
"""One POST to GetPublishedFileDetails covering all ids.
|
||||||
|
|
||||||
|
In `mode="add"`, any non-L4D2 (`consumer_app_id != 550`) raises
|
||||||
|
WorkshopValidationError so the user-add request fails cleanly.
|
||||||
|
|
||||||
|
In `mode="refresh"`, non-L4D2 entries are skipped from the result.
|
||||||
|
|
||||||
|
Items with `result != 1` are returned as-is (the caller persists the result
|
||||||
|
code into `WorkshopItem.last_error`).
|
||||||
|
"""
|
||||||
|
if not steam_ids:
|
||||||
|
return []
|
||||||
|
for sid in steam_ids:
|
||||||
|
if not _NUMERIC_ID_RE.fullmatch(sid):
|
||||||
|
raise ValueError(f"steam id must be digits only: {sid!r}")
|
||||||
|
|
||||||
|
payload: dict[str, str | int] = {"itemcount": len(steam_ids)}
|
||||||
|
for index, sid in enumerate(steam_ids):
|
||||||
|
payload[f"publishedfileids[{index}]"] = sid
|
||||||
|
|
||||||
|
response = _session().post(
|
||||||
|
GET_PUBLISHED_FILE_DETAILS_URL,
|
||||||
|
data=payload,
|
||||||
|
timeout=REQUEST_TIMEOUT_SECONDS,
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
body = response.json()
|
||||||
|
|
||||||
|
metas: list[WorkshopMetadata] = []
|
||||||
|
for entry in body.get("response", {}).get("publishedfiledetails", []):
|
||||||
|
meta = WorkshopMetadata(
|
||||||
|
steam_id=str(entry.get("publishedfileid", "")),
|
||||||
|
title=str(entry.get("title", "") or ""),
|
||||||
|
filename=str(entry.get("filename", "") or ""),
|
||||||
|
file_url=str(entry.get("file_url", "") or ""),
|
||||||
|
file_size=int(entry.get("file_size") or 0),
|
||||||
|
time_updated=int(entry.get("time_updated") or 0),
|
||||||
|
preview_url=str(entry.get("preview_url", "") or ""),
|
||||||
|
consumer_app_id=int(entry.get("consumer_app_id") or 0),
|
||||||
|
result=int(entry.get("result") or 0),
|
||||||
|
)
|
||||||
|
|
||||||
|
# consumer_app_id is only meaningful when the lookup itself succeeded.
|
||||||
|
if meta.result == 1 and meta.consumer_app_id != L4D2_APP_ID:
|
||||||
|
if mode == "add":
|
||||||
|
raise WorkshopValidationError(
|
||||||
|
f"item {meta.steam_id} is not a Left 4 Dead 2 workshop "
|
||||||
|
f"item (consumer_app_id={meta.consumer_app_id})"
|
||||||
|
)
|
||||||
|
# refresh mode: drop the entry silently from the batch
|
||||||
|
continue
|
||||||
|
|
||||||
|
metas.append(meta)
|
||||||
|
return metas
|
||||||
|
|
||||||
|
|
||||||
|
def download_to_cache(
|
||||||
|
meta: WorkshopMetadata,
|
||||||
|
cache_root: Path,
|
||||||
|
*,
|
||||||
|
on_progress: Callable[[int, int], None] | None = None,
|
||||||
|
should_cancel: Callable[[], bool] | None = None,
|
||||||
|
) -> Path:
|
||||||
|
"""Download `meta.file_url` to `cache_root/{steam_id}.vpk`.
|
||||||
|
|
||||||
|
Atomic via `*.partial` + `os.replace`. Idempotent: a no-op when the
|
||||||
|
existing file's `(mtime, size)` already matches `(time_updated, file_size)`.
|
||||||
|
Sets `os.utime(target, (time_updated, time_updated))` so the next run
|
||||||
|
short-circuits.
|
||||||
|
"""
|
||||||
|
if not _NUMERIC_ID_RE.fullmatch(meta.steam_id):
|
||||||
|
raise ValueError("meta.steam_id must be digits only")
|
||||||
|
cache_root.mkdir(parents=True, exist_ok=True)
|
||||||
|
target = cache_root / f"{meta.steam_id}.vpk"
|
||||||
|
|
||||||
|
if (
|
||||||
|
target.exists()
|
||||||
|
and int(target.stat().st_mtime) == int(meta.time_updated)
|
||||||
|
and int(target.stat().st_size) == int(meta.file_size)
|
||||||
|
):
|
||||||
|
return target
|
||||||
|
|
||||||
|
if not meta.file_url:
|
||||||
|
raise ValueError(f"item {meta.steam_id} has no file_url; cannot download")
|
||||||
|
|
||||||
|
partial = target.with_suffix(target.suffix + ".partial")
|
||||||
|
response = _session().get(meta.file_url, stream=True, timeout=REQUEST_TIMEOUT_SECONDS)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
written = 0
|
||||||
|
try:
|
||||||
|
with open(partial, "wb") as f:
|
||||||
|
for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_BYTES):
|
||||||
|
if should_cancel is not None and should_cancel():
|
||||||
|
raise InterruptedError("download cancelled")
|
||||||
|
if not chunk:
|
||||||
|
continue
|
||||||
|
f.write(chunk)
|
||||||
|
written += len(chunk)
|
||||||
|
if on_progress is not None:
|
||||||
|
on_progress(written, int(meta.file_size))
|
||||||
|
os.replace(partial, target)
|
||||||
|
except BaseException:
|
||||||
|
partial.unlink(missing_ok=True)
|
||||||
|
raise
|
||||||
|
|
||||||
|
os.utime(target, (meta.time_updated, meta.time_updated))
|
||||||
|
return target
|
||||||
|
|
||||||
|
|
||||||
|
def refresh_all(
|
||||||
|
metas: Iterable[WorkshopMetadata],
|
||||||
|
cache_root: Path,
|
||||||
|
*,
|
||||||
|
executor_workers: int = 8,
|
||||||
|
should_cancel: Callable[[], bool] | None = None,
|
||||||
|
) -> RefreshReport:
|
||||||
|
"""Download (or skip-as-cached) every metadata item using a thread pool.
|
||||||
|
Per-item errors are collected; sibling items continue."""
|
||||||
|
metas_list = list(metas)
|
||||||
|
report = RefreshReport()
|
||||||
|
if not metas_list:
|
||||||
|
return report
|
||||||
|
|
||||||
|
cache_root.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=executor_workers) as executor:
|
||||||
|
futures = {}
|
||||||
|
for meta in metas_list:
|
||||||
|
if should_cancel is not None and should_cancel():
|
||||||
|
break
|
||||||
|
future = executor.submit(
|
||||||
|
download_to_cache,
|
||||||
|
meta,
|
||||||
|
cache_root,
|
||||||
|
should_cancel=should_cancel,
|
||||||
|
)
|
||||||
|
futures[future] = meta
|
||||||
|
|
||||||
|
for future in as_completed(futures):
|
||||||
|
meta = futures[future]
|
||||||
|
try:
|
||||||
|
future.result()
|
||||||
|
except Exception as exc:
|
||||||
|
report.errors += 1
|
||||||
|
report.per_item_errors[meta.steam_id] = str(exc)
|
||||||
|
continue
|
||||||
|
report.downloaded += 1
|
||||||
|
|
||||||
|
return report
|
||||||
312
l4d2web/tests/test_steam_workshop.py
Normal file
312
l4d2web/tests/test_steam_workshop.py
Normal file
|
|
@ -0,0 +1,312 @@
|
||||||
|
"""Tests for the Steam Workshop API client and downloader."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from l4d2web.services import steam_workshop
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_workshop_input_single_numeric() -> None:
|
||||||
|
assert steam_workshop.parse_workshop_input("12345") == ["12345"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_workshop_input_single_url() -> None:
|
||||||
|
url = "https://steamcommunity.com/sharedfiles/filedetails/?id=98765"
|
||||||
|
assert steam_workshop.parse_workshop_input(url) == ["98765"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_workshop_input_workshop_url_variant() -> None:
|
||||||
|
url = "steamcommunity.com/workshop/filedetails/?id=42"
|
||||||
|
assert steam_workshop.parse_workshop_input(url) == ["42"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_workshop_input_multiline_batch() -> None:
|
||||||
|
raw = """
|
||||||
|
12345
|
||||||
|
https://steamcommunity.com/sharedfiles/filedetails/?id=67890
|
||||||
|
99999
|
||||||
|
"""
|
||||||
|
assert steam_workshop.parse_workshop_input(raw) == ["12345", "67890", "99999"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_workshop_input_deduplicates_preserving_order() -> None:
|
||||||
|
raw = "100\n200\n100\n300"
|
||||||
|
assert steam_workshop.parse_workshop_input(raw) == ["100", "200", "300"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_workshop_input_rejects_garbage() -> None:
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
steam_workshop.parse_workshop_input("not-a-number")
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_workshop_input_rejects_empty() -> None:
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
steam_workshop.parse_workshop_input("")
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_workshop_input_rejects_non_steam_url() -> None:
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
steam_workshop.parse_workshop_input("https://example.com/?id=12345")
|
||||||
|
|
||||||
|
|
||||||
|
def test_endpoints_are_https() -> None:
|
||||||
|
assert steam_workshop.GET_PUBLISHED_FILE_DETAILS_URL.startswith("https://")
|
||||||
|
assert steam_workshop.GET_COLLECTION_DETAILS_URL.startswith("https://")
|
||||||
|
assert "api.steampowered.com" in steam_workshop.GET_PUBLISHED_FILE_DETAILS_URL
|
||||||
|
|
||||||
|
|
||||||
|
def test_resolve_collection_returns_child_ids() -> None:
|
||||||
|
fake_response = MagicMock(status_code=200)
|
||||||
|
fake_response.raise_for_status = MagicMock()
|
||||||
|
fake_response.json.return_value = {
|
||||||
|
"response": {
|
||||||
|
"collectiondetails": [
|
||||||
|
{
|
||||||
|
"publishedfileid": "555",
|
||||||
|
"result": 1,
|
||||||
|
"children": [
|
||||||
|
{"publishedfileid": "1001", "filetype": 0},
|
||||||
|
{"publishedfileid": "1002", "filetype": 0},
|
||||||
|
{"publishedfileid": "9999", "filetype": 1}, # nested collection — skip
|
||||||
|
],
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
with patch.object(steam_workshop, "_session", return_value=MagicMock(post=MagicMock(return_value=fake_response))):
|
||||||
|
ids = steam_workshop.resolve_collection("555")
|
||||||
|
assert ids == ["1001", "1002"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_metadata_batch_parses_published_file_details() -> None:
|
||||||
|
fake_response = MagicMock(status_code=200)
|
||||||
|
fake_response.raise_for_status = MagicMock()
|
||||||
|
fake_response.json.return_value = {
|
||||||
|
"response": {
|
||||||
|
"publishedfiledetails": [
|
||||||
|
{
|
||||||
|
"publishedfileid": "1001",
|
||||||
|
"result": 1,
|
||||||
|
"consumer_app_id": 550,
|
||||||
|
"title": "Map A",
|
||||||
|
"filename": "map_a.vpk",
|
||||||
|
"file_url": "https://steamusercontent.com/abc/map_a.vpk",
|
||||||
|
"file_size": "1024",
|
||||||
|
"time_updated": 1700000000,
|
||||||
|
"preview_url": "https://steamuserimages.com/preview_a.jpg",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
with patch.object(steam_workshop, "_session", return_value=MagicMock(post=MagicMock(return_value=fake_response))):
|
||||||
|
metas = steam_workshop.fetch_metadata_batch(["1001"], mode="add")
|
||||||
|
assert len(metas) == 1
|
||||||
|
m = metas[0]
|
||||||
|
assert m.steam_id == "1001"
|
||||||
|
assert m.title == "Map A"
|
||||||
|
assert m.filename == "map_a.vpk"
|
||||||
|
assert m.file_url == "https://steamusercontent.com/abc/map_a.vpk"
|
||||||
|
assert m.file_size == 1024
|
||||||
|
assert m.time_updated == 1700000000
|
||||||
|
assert m.preview_url == "https://steamuserimages.com/preview_a.jpg"
|
||||||
|
assert m.consumer_app_id == 550
|
||||||
|
assert m.result == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_metadata_batch_rejects_non_l4d2_in_add_mode() -> None:
|
||||||
|
fake_response = MagicMock(status_code=200)
|
||||||
|
fake_response.raise_for_status = MagicMock()
|
||||||
|
fake_response.json.return_value = {
|
||||||
|
"response": {
|
||||||
|
"publishedfiledetails": [
|
||||||
|
{
|
||||||
|
"publishedfileid": "1001",
|
||||||
|
"result": 1,
|
||||||
|
"consumer_app_id": 440, # TF2
|
||||||
|
"title": "Other",
|
||||||
|
"filename": "x.vpk",
|
||||||
|
"file_url": "https://example.com/x.vpk",
|
||||||
|
"file_size": "0",
|
||||||
|
"time_updated": 0,
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
with patch.object(steam_workshop, "_session", return_value=MagicMock(post=MagicMock(return_value=fake_response))):
|
||||||
|
with pytest.raises(steam_workshop.WorkshopValidationError):
|
||||||
|
steam_workshop.fetch_metadata_batch(["1001"], mode="add")
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_metadata_batch_skips_non_l4d2_in_refresh_mode() -> None:
|
||||||
|
fake_response = MagicMock(status_code=200)
|
||||||
|
fake_response.raise_for_status = MagicMock()
|
||||||
|
fake_response.json.return_value = {
|
||||||
|
"response": {
|
||||||
|
"publishedfiledetails": [
|
||||||
|
{
|
||||||
|
"publishedfileid": "1001",
|
||||||
|
"result": 1,
|
||||||
|
"consumer_app_id": 440,
|
||||||
|
"title": "Other",
|
||||||
|
"filename": "x.vpk",
|
||||||
|
"file_url": "https://example.com/x.vpk",
|
||||||
|
"file_size": "0",
|
||||||
|
"time_updated": 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"publishedfileid": "1002",
|
||||||
|
"result": 1,
|
||||||
|
"consumer_app_id": 550,
|
||||||
|
"title": "Good",
|
||||||
|
"filename": "g.vpk",
|
||||||
|
"file_url": "https://example.com/g.vpk",
|
||||||
|
"file_size": "100",
|
||||||
|
"time_updated": 1,
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
with patch.object(steam_workshop, "_session", return_value=MagicMock(post=MagicMock(return_value=fake_response))):
|
||||||
|
metas = steam_workshop.fetch_metadata_batch(["1001", "1002"], mode="refresh")
|
||||||
|
# The non-L4D2 item is dropped; the L4D2 item is kept.
|
||||||
|
assert [m.steam_id for m in metas] == ["1002"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_metadata_batch_captures_result_failure() -> None:
|
||||||
|
fake_response = MagicMock(status_code=200)
|
||||||
|
fake_response.raise_for_status = MagicMock()
|
||||||
|
fake_response.json.return_value = {
|
||||||
|
"response": {
|
||||||
|
"publishedfiledetails": [
|
||||||
|
{
|
||||||
|
"publishedfileid": "999",
|
||||||
|
"result": 9, # not found / hidden / etc.
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
with patch.object(steam_workshop, "_session", return_value=MagicMock(post=MagicMock(return_value=fake_response))):
|
||||||
|
metas = steam_workshop.fetch_metadata_batch(["999"], mode="refresh")
|
||||||
|
# Item is kept but marked with the failing result; consumer app id never validated.
|
||||||
|
assert len(metas) == 1
|
||||||
|
assert metas[0].result == 9
|
||||||
|
|
||||||
|
|
||||||
|
def test_download_to_cache_writes_atomically_and_sets_mtime(tmp_path: Path) -> None:
|
||||||
|
cache_root = tmp_path / "workshop_cache"
|
||||||
|
cache_root.mkdir()
|
||||||
|
meta = steam_workshop.WorkshopMetadata(
|
||||||
|
steam_id="1001",
|
||||||
|
title="A",
|
||||||
|
filename="a.vpk",
|
||||||
|
file_url="https://example.com/a.vpk",
|
||||||
|
file_size=11,
|
||||||
|
time_updated=1700000000,
|
||||||
|
preview_url="",
|
||||||
|
consumer_app_id=550,
|
||||||
|
result=1,
|
||||||
|
)
|
||||||
|
fake_response = MagicMock(status_code=200)
|
||||||
|
fake_response.raise_for_status = MagicMock()
|
||||||
|
fake_response.iter_content.return_value = [b"hello world"]
|
||||||
|
|
||||||
|
with patch.object(steam_workshop, "_session", return_value=MagicMock(get=MagicMock(return_value=fake_response))):
|
||||||
|
path = steam_workshop.download_to_cache(meta, cache_root)
|
||||||
|
|
||||||
|
assert path == cache_root / "1001.vpk"
|
||||||
|
assert path.read_bytes() == b"hello world"
|
||||||
|
assert int(path.stat().st_mtime) == 1700000000
|
||||||
|
# No leftover .partial file.
|
||||||
|
assert not (cache_root / "1001.vpk.partial").exists()
|
||||||
|
|
||||||
|
|
||||||
|
def test_download_to_cache_is_idempotent(tmp_path: Path) -> None:
|
||||||
|
cache_root = tmp_path / "workshop_cache"
|
||||||
|
cache_root.mkdir()
|
||||||
|
target = cache_root / "1001.vpk"
|
||||||
|
target.write_bytes(b"existing")
|
||||||
|
os.utime(target, (1700000000, 1700000000))
|
||||||
|
|
||||||
|
meta = steam_workshop.WorkshopMetadata(
|
||||||
|
steam_id="1001",
|
||||||
|
title="A",
|
||||||
|
filename="a.vpk",
|
||||||
|
file_url="https://example.com/a.vpk",
|
||||||
|
file_size=8, # matches existing
|
||||||
|
time_updated=1700000000, # matches existing mtime
|
||||||
|
preview_url="",
|
||||||
|
consumer_app_id=550,
|
||||||
|
result=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
fake_session = MagicMock()
|
||||||
|
with patch.object(steam_workshop, "_session", return_value=fake_session):
|
||||||
|
steam_workshop.download_to_cache(meta, cache_root)
|
||||||
|
|
||||||
|
fake_session.get.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_download_to_cache_redownloads_when_mtime_or_size_differ(tmp_path: Path) -> None:
|
||||||
|
cache_root = tmp_path / "workshop_cache"
|
||||||
|
cache_root.mkdir()
|
||||||
|
target = cache_root / "1001.vpk"
|
||||||
|
target.write_bytes(b"old")
|
||||||
|
os.utime(target, (1500000000, 1500000000))
|
||||||
|
|
||||||
|
meta = steam_workshop.WorkshopMetadata(
|
||||||
|
steam_id="1001",
|
||||||
|
title="A",
|
||||||
|
filename="a.vpk",
|
||||||
|
file_url="https://example.com/a.vpk",
|
||||||
|
file_size=11,
|
||||||
|
time_updated=1700000000,
|
||||||
|
preview_url="",
|
||||||
|
consumer_app_id=550,
|
||||||
|
result=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
fake_response = MagicMock(status_code=200)
|
||||||
|
fake_response.raise_for_status = MagicMock()
|
||||||
|
fake_response.iter_content.return_value = [b"hello world"]
|
||||||
|
|
||||||
|
with patch.object(steam_workshop, "_session", return_value=MagicMock(get=MagicMock(return_value=fake_response))):
|
||||||
|
steam_workshop.download_to_cache(meta, cache_root)
|
||||||
|
|
||||||
|
assert target.read_bytes() == b"hello world"
|
||||||
|
assert int(target.stat().st_mtime) == 1700000000
|
||||||
|
|
||||||
|
|
||||||
|
def test_refresh_all_uses_thread_pool_and_collects_errors(tmp_path: Path) -> None:
|
||||||
|
cache_root = tmp_path / "workshop_cache"
|
||||||
|
cache_root.mkdir()
|
||||||
|
|
||||||
|
metas = [
|
||||||
|
steam_workshop.WorkshopMetadata(
|
||||||
|
steam_id=str(i),
|
||||||
|
title=f"M{i}",
|
||||||
|
filename=f"m{i}.vpk",
|
||||||
|
file_url=f"https://example.com/m{i}.vpk",
|
||||||
|
file_size=5,
|
||||||
|
time_updated=1700000000,
|
||||||
|
preview_url="",
|
||||||
|
consumer_app_id=550,
|
||||||
|
result=1,
|
||||||
|
)
|
||||||
|
for i in (1, 2, 3)
|
||||||
|
]
|
||||||
|
|
||||||
|
def fake_download(meta, cache_root_arg, **kwargs):
|
||||||
|
if meta.steam_id == "2":
|
||||||
|
raise RuntimeError("simulated download failure")
|
||||||
|
return cache_root_arg / f"{meta.steam_id}.vpk"
|
||||||
|
|
||||||
|
with patch.object(steam_workshop, "download_to_cache", side_effect=fake_download):
|
||||||
|
report = steam_workshop.refresh_all(metas, cache_root, executor_workers=4)
|
||||||
|
|
||||||
|
assert report.downloaded == 2
|
||||||
|
assert report.errors == 1
|
||||||
|
assert "2" in report.per_item_errors
|
||||||
Loading…
Reference in a new issue