"""Steam Workshop API client + downloader. Pure HTTP/file logic — no DB writes, no Flask, no job-worker integration. Used by the workshop overlay builder and the admin refresh job. Endpoints: - GetCollectionDetails: resolve a collection ID to its child item IDs. - GetPublishedFileDetails: batch-fetch metadata for items, including a public file_url for the .vpk. Both endpoints accept anonymous POSTs; no Steam Web API key required. """ from __future__ import annotations import os import re import threading from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from pathlib import Path from typing import Callable, Iterable, Literal import requests # HTTPS only (decision 16). The reference downloader uses HTTP — we don't. GET_PUBLISHED_FILE_DETAILS_URL = ( "https://api.steampowered.com/ISteamRemoteStorage/GetPublishedFileDetails/v1/" ) GET_COLLECTION_DETAILS_URL = ( "https://api.steampowered.com/ISteamRemoteStorage/GetCollectionDetails/v1/" ) L4D2_APP_ID = 550 REQUEST_TIMEOUT_SECONDS = 30 DOWNLOAD_CHUNK_BYTES = 1_048_576 _NUMERIC_ID_RE = re.compile(r"^\d+$") _URL_ID_RE = re.compile(r"^https?://([a-z0-9.-]*\.)?steamcommunity\.com/.*[?&]id=(\d+)", re.IGNORECASE) _BARE_URL_ID_RE = re.compile(r"^([a-z0-9.-]*\.)?steamcommunity\.com/.*[?&]id=(\d+)", re.IGNORECASE) _session_local = threading.local() def _session() -> requests.Session: """Per-thread session for connection reuse without cross-thread leakage.""" sess = getattr(_session_local, "session", None) if sess is None: sess = requests.Session() _session_local.session = sess return sess class WorkshopValidationError(ValueError): """Raised during user-add when an item fails a fixed precondition (e.g. consumer_app_id != 550).""" @dataclass(slots=True) class WorkshopMetadata: steam_id: str title: str filename: str file_url: str file_size: int time_updated: int preview_url: str consumer_app_id: int result: int @dataclass(slots=True) class RefreshReport: downloaded: int = 0 skipped: int = 0 errors: int = 0 per_item_errors: dict[str, str] = field(default_factory=dict) def parse_workshop_input(raw: str) -> list[str]: """Parse a single ID, a single workshop URL, or a multi-line / whitespace- separated batch of either. Returns deduplicated digit-only IDs in order. Raises ValueError on garbage.""" if not raw or not raw.strip(): raise ValueError("input is empty") tokens: list[str] = [] for token in re.split(r"\s+", raw.strip()): if not token: continue tokens.append(_extract_id(token)) seen: set[str] = set() deduped: list[str] = [] for tok in tokens: if tok not in seen: seen.add(tok) deduped.append(tok) return deduped def _extract_id(token: str) -> str: if _NUMERIC_ID_RE.fullmatch(token): return token m = _URL_ID_RE.match(token) if m: return m.group(2) m = _BARE_URL_ID_RE.match(token) if m: return m.group(2) raise ValueError(f"could not parse a Steam workshop id from: {token!r}") def resolve_collection(collection_id: str) -> list[str]: """POST GetCollectionDetails for one collection; return its non-collection child publishedfileids in order. Nested collections (filetype != 0) are skipped.""" if not _NUMERIC_ID_RE.fullmatch(collection_id): raise ValueError("collection_id must be digits only") response = _session().post( GET_COLLECTION_DETAILS_URL, data={ "collectioncount": 1, "publishedfileids[0]": collection_id, }, timeout=REQUEST_TIMEOUT_SECONDS, ) response.raise_for_status() payload = response.json() children: list[str] = [] for collection in payload.get("response", {}).get("collectiondetails", []): for child in collection.get("children", []): if child.get("filetype", 0) != 0: continue # nested collection, skip child_id = child.get("publishedfileid") if child_id is not None: children.append(str(child_id)) return children def fetch_metadata_batch( steam_ids: list[str], *, mode: Literal["add", "refresh"] ) -> list[WorkshopMetadata]: """One POST to GetPublishedFileDetails covering all ids. In `mode="add"`, any non-L4D2 (`consumer_app_id != 550`) raises WorkshopValidationError so the user-add request fails cleanly. In `mode="refresh"`, non-L4D2 entries are skipped from the result. Items with `result != 1` are returned as-is (the caller persists the result code into `WorkshopItem.last_error`). """ if not steam_ids: return [] for sid in steam_ids: if not _NUMERIC_ID_RE.fullmatch(sid): raise ValueError(f"steam id must be digits only: {sid!r}") payload: dict[str, str | int] = {"itemcount": len(steam_ids)} for index, sid in enumerate(steam_ids): payload[f"publishedfileids[{index}]"] = sid response = _session().post( GET_PUBLISHED_FILE_DETAILS_URL, data=payload, timeout=REQUEST_TIMEOUT_SECONDS, ) response.raise_for_status() body = response.json() metas: list[WorkshopMetadata] = [] for entry in body.get("response", {}).get("publishedfiledetails", []): meta = WorkshopMetadata( steam_id=str(entry.get("publishedfileid", "")), title=str(entry.get("title", "") or ""), filename=str(entry.get("filename", "") or ""), file_url=str(entry.get("file_url", "") or ""), file_size=int(entry.get("file_size") or 0), time_updated=int(entry.get("time_updated") or 0), preview_url=str(entry.get("preview_url", "") or ""), consumer_app_id=int(entry.get("consumer_app_id") or 0), result=int(entry.get("result") or 0), ) # consumer_app_id is only meaningful when the lookup itself succeeded. if meta.result == 1 and meta.consumer_app_id != L4D2_APP_ID: if mode == "add": raise WorkshopValidationError( f"item {meta.steam_id} is not a Left 4 Dead 2 workshop " f"item (consumer_app_id={meta.consumer_app_id})" ) # refresh mode: drop the entry silently from the batch continue metas.append(meta) return metas def download_to_cache( meta: WorkshopMetadata, cache_root: Path, *, on_progress: Callable[[int, int], None] | None = None, should_cancel: Callable[[], bool] | None = None, ) -> Path: """Download `meta.file_url` to `cache_root/{steam_id}.vpk`. Atomic via `*.partial` + `os.replace`. Idempotent: a no-op when the existing file's `(mtime, size)` already matches `(time_updated, file_size)`. Sets `os.utime(target, (time_updated, time_updated))` so the next run short-circuits. """ if not _NUMERIC_ID_RE.fullmatch(meta.steam_id): raise ValueError("meta.steam_id must be digits only") cache_root.mkdir(parents=True, exist_ok=True) target = cache_root / f"{meta.steam_id}.vpk" if ( target.exists() and int(target.stat().st_mtime) == int(meta.time_updated) and int(target.stat().st_size) == int(meta.file_size) ): return target if not meta.file_url: raise ValueError(f"item {meta.steam_id} has no file_url; cannot download") partial = target.with_suffix(target.suffix + ".partial") response = _session().get(meta.file_url, stream=True, timeout=REQUEST_TIMEOUT_SECONDS) response.raise_for_status() written = 0 try: with open(partial, "wb") as f: for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_BYTES): if should_cancel is not None and should_cancel(): raise InterruptedError("download cancelled") if not chunk: continue f.write(chunk) written += len(chunk) if on_progress is not None: on_progress(written, int(meta.file_size)) os.replace(partial, target) except BaseException: partial.unlink(missing_ok=True) raise os.utime(target, (meta.time_updated, meta.time_updated)) return target def refresh_all( metas: Iterable[WorkshopMetadata], cache_root: Path, *, executor_workers: int = 8, should_cancel: Callable[[], bool] | None = None, ) -> RefreshReport: """Download (or skip-as-cached) every metadata item using a thread pool. Per-item errors are collected; sibling items continue.""" metas_list = list(metas) report = RefreshReport() if not metas_list: return report cache_root.mkdir(parents=True, exist_ok=True) with ThreadPoolExecutor(max_workers=executor_workers) as executor: futures = {} for meta in metas_list: if should_cancel is not None and should_cancel(): break future = executor.submit( download_to_cache, meta, cache_root, should_cancel=should_cancel, ) futures[future] = meta for future in as_completed(futures): meta = futures[future] try: future.result() except Exception as exc: report.errors += 1 report.per_item_errors[meta.steam_id] = str(exc) continue report.downloaded += 1 return report