left4me/l4d2web/l4d2web/services/steam_workshop.py
mwiegand 49992b3a26
refactor(repo): uv workspace + hatchling + layout restructure
Migrate from pip-install-e + setuptools to a uv workspace with a
committed uv.lock for deterministic deps. Switch both members to
hatchling, and move package sources into nested standard layout
(l4d2host/l4d2host/, l4d2web/l4d2web/) so builds work from a
read-only source tree — setuptools wrote egg-info to source under
the old layout, which broke uv sync on the root-owned /opt/left4me/src.

Local dev install: `pip install -e ./l4d2host -e ./l4d2web` -> `uv sync`.
.envrc switches from `layout python python3.13` to `use uv`. Python
pinned to 3.13 via .python-version.

l4d2web now declares its cross-dep on l4d2host explicitly via
[tool.uv.sources] (workspace = true). l4d2web/alembic.ini and
l4d2web/alembic/ stay at the project root (standard alembic layout).

Test fixes:
- tests/__init__.py added to both test dirs so pytest doesn't shadow
  l4d2host as a namespace package via outer-dir walk.
- 3 CWD-relative paths in tests (l4d2web/static/css/{tokens,layout}.css
  and js/sse.js) anchored to Path(__file__) so they survive layout
  changes.
- Two test_install.py tests now monkeypatch HOME to tmp_path so they
  stop silently mutating ~/.steam/sdk32 on every run.

628 tests pass under sandboxed `uv run pytest`.

Per docs/superpowers/plans/2026-05-15-uv-workspace-execution.md;
prereq for the ckn-bw bundle's uv-sync action (queued).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 22:04:29 +02:00

295 lines
9.5 KiB
Python

"""Steam Workshop API client + downloader.
Pure HTTP/file logic — no DB writes, no Flask, no job-worker integration.
Used by the workshop overlay builder and the admin refresh job.
Endpoints:
- GetCollectionDetails: resolve a collection ID to its child item IDs.
- GetPublishedFileDetails: batch-fetch metadata for items, including a public
file_url for the .vpk.
Both endpoints accept anonymous POSTs; no Steam Web API key required.
"""
from __future__ import annotations
import os
import re
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Iterable, Literal
import requests
# HTTPS only (decision 16). The reference downloader uses HTTP — we don't.
GET_PUBLISHED_FILE_DETAILS_URL = (
"https://api.steampowered.com/ISteamRemoteStorage/GetPublishedFileDetails/v1/"
)
GET_COLLECTION_DETAILS_URL = (
"https://api.steampowered.com/ISteamRemoteStorage/GetCollectionDetails/v1/"
)
L4D2_APP_ID = 550
REQUEST_TIMEOUT_SECONDS = 30
DOWNLOAD_CHUNK_BYTES = 1_048_576
_NUMERIC_ID_RE = re.compile(r"^\d+$")
_URL_ID_RE = re.compile(r"^https?://([a-z0-9.-]*\.)?steamcommunity\.com/.*[?&]id=(\d+)", re.IGNORECASE)
_BARE_URL_ID_RE = re.compile(r"^([a-z0-9.-]*\.)?steamcommunity\.com/.*[?&]id=(\d+)", re.IGNORECASE)
_session_local = threading.local()
def _session() -> requests.Session:
"""Per-thread session for connection reuse without cross-thread leakage."""
sess = getattr(_session_local, "session", None)
if sess is None:
sess = requests.Session()
_session_local.session = sess
return sess
class WorkshopValidationError(ValueError):
"""Raised during user-add when an item fails a fixed precondition
(e.g. consumer_app_id != 550)."""
@dataclass(slots=True)
class WorkshopMetadata:
steam_id: str
title: str
filename: str
file_url: str
file_size: int
time_updated: int
preview_url: str
consumer_app_id: int
result: int
@dataclass(slots=True)
class RefreshReport:
downloaded: int = 0
skipped: int = 0
errors: int = 0
per_item_errors: dict[str, str] = field(default_factory=dict)
def parse_workshop_input(raw: str) -> list[str]:
"""Parse a single ID, a single workshop URL, or a multi-line / whitespace-
separated batch of either. Returns deduplicated digit-only IDs in order.
Raises ValueError on garbage."""
if not raw or not raw.strip():
raise ValueError("input is empty")
tokens: list[str] = []
for token in re.split(r"\s+", raw.strip()):
if not token:
continue
tokens.append(_extract_id(token))
seen: set[str] = set()
deduped: list[str] = []
for tok in tokens:
if tok not in seen:
seen.add(tok)
deduped.append(tok)
return deduped
def _extract_id(token: str) -> str:
if _NUMERIC_ID_RE.fullmatch(token):
return token
m = _URL_ID_RE.match(token)
if m:
return m.group(2)
m = _BARE_URL_ID_RE.match(token)
if m:
return m.group(2)
raise ValueError(f"could not parse a Steam workshop id from: {token!r}")
def resolve_collection(collection_id: str) -> list[str]:
"""POST GetCollectionDetails for one collection; return its non-collection
child publishedfileids in order. Nested collections (filetype != 0) are
skipped."""
if not _NUMERIC_ID_RE.fullmatch(collection_id):
raise ValueError("collection_id must be digits only")
response = _session().post(
GET_COLLECTION_DETAILS_URL,
data={
"collectioncount": 1,
"publishedfileids[0]": collection_id,
},
timeout=REQUEST_TIMEOUT_SECONDS,
)
response.raise_for_status()
payload = response.json()
children: list[str] = []
for collection in payload.get("response", {}).get("collectiondetails", []):
for child in collection.get("children", []):
if child.get("filetype", 0) != 0:
continue # nested collection, skip
child_id = child.get("publishedfileid")
if child_id is not None:
children.append(str(child_id))
return children
def fetch_metadata_batch(
steam_ids: list[str], *, mode: Literal["add", "refresh"]
) -> list[WorkshopMetadata]:
"""One POST to GetPublishedFileDetails covering all ids.
In `mode="add"`, any non-L4D2 (`consumer_app_id != 550`) raises
WorkshopValidationError so the user-add request fails cleanly.
In `mode="refresh"`, non-L4D2 entries are skipped from the result.
Items with `result != 1` are returned as-is (the caller persists the result
code into `WorkshopItem.last_error`).
"""
if not steam_ids:
return []
for sid in steam_ids:
if not _NUMERIC_ID_RE.fullmatch(sid):
raise ValueError(f"steam id must be digits only: {sid!r}")
payload: dict[str, str | int] = {"itemcount": len(steam_ids)}
for index, sid in enumerate(steam_ids):
payload[f"publishedfileids[{index}]"] = sid
response = _session().post(
GET_PUBLISHED_FILE_DETAILS_URL,
data=payload,
timeout=REQUEST_TIMEOUT_SECONDS,
)
response.raise_for_status()
body = response.json()
metas: list[WorkshopMetadata] = []
for entry in body.get("response", {}).get("publishedfiledetails", []):
meta = WorkshopMetadata(
steam_id=str(entry.get("publishedfileid", "")),
title=str(entry.get("title", "") or ""),
filename=str(entry.get("filename", "") or ""),
file_url=str(entry.get("file_url", "") or ""),
file_size=int(entry.get("file_size") or 0),
time_updated=int(entry.get("time_updated") or 0),
preview_url=str(entry.get("preview_url", "") or ""),
consumer_app_id=int(entry.get("consumer_app_id") or 0),
result=int(entry.get("result") or 0),
)
# consumer_app_id is only meaningful when the lookup itself succeeded.
if meta.result == 1 and meta.consumer_app_id != L4D2_APP_ID:
if mode == "add":
raise WorkshopValidationError(
f"item {meta.steam_id} is not a Left 4 Dead 2 workshop "
f"item (consumer_app_id={meta.consumer_app_id})"
)
# refresh mode: drop the entry silently from the batch
continue
metas.append(meta)
return metas
def download_to_cache(
meta: WorkshopMetadata,
cache_root: Path,
*,
on_progress: Callable[[int, int], None] | None = None,
should_cancel: Callable[[], bool] | None = None,
) -> Path:
"""Download `meta.file_url` to `cache_root/{steam_id}.vpk`.
Atomic via `*.partial` + `os.replace`. Idempotent: a no-op when the
existing file's `(mtime, size)` already matches `(time_updated, file_size)`.
Sets `os.utime(target, (time_updated, time_updated))` so the next run
short-circuits.
"""
if not _NUMERIC_ID_RE.fullmatch(meta.steam_id):
raise ValueError("meta.steam_id must be digits only")
cache_root.mkdir(parents=True, exist_ok=True)
target = cache_root / f"{meta.steam_id}.vpk"
if (
target.exists()
and int(target.stat().st_mtime) == int(meta.time_updated)
and int(target.stat().st_size) == int(meta.file_size)
):
return target
if not meta.file_url:
raise ValueError(f"item {meta.steam_id} has no file_url; cannot download")
partial = target.with_suffix(target.suffix + ".partial")
response = _session().get(meta.file_url, stream=True, timeout=REQUEST_TIMEOUT_SECONDS)
response.raise_for_status()
written = 0
try:
with open(partial, "wb") as f:
for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_BYTES):
if should_cancel is not None and should_cancel():
raise InterruptedError("download cancelled")
if not chunk:
continue
f.write(chunk)
written += len(chunk)
if on_progress is not None:
on_progress(written, int(meta.file_size))
os.replace(partial, target)
except BaseException:
partial.unlink(missing_ok=True)
raise
os.utime(target, (meta.time_updated, meta.time_updated))
return target
def refresh_all(
metas: Iterable[WorkshopMetadata],
cache_root: Path,
*,
executor_workers: int = 8,
should_cancel: Callable[[], bool] | None = None,
) -> RefreshReport:
"""Download (or skip-as-cached) every metadata item using a thread pool.
Per-item errors are collected; sibling items continue."""
metas_list = list(metas)
report = RefreshReport()
if not metas_list:
return report
cache_root.mkdir(parents=True, exist_ok=True)
with ThreadPoolExecutor(max_workers=executor_workers) as executor:
futures = {}
for meta in metas_list:
if should_cancel is not None and should_cancel():
break
future = executor.submit(
download_to_cache,
meta,
cache_root,
should_cancel=should_cancel,
)
futures[future] = meta
for future in as_completed(futures):
meta = futures[future]
try:
future.result()
except Exception as exc:
report.errors += 1
report.per_item_errors[meta.steam_id] = str(exc)
continue
report.downloaded += 1
return report