from __future__ import annotations import hashlib import os import shutil from pathlib import Path import tempfile from zipfile import ZipFile import py7zr import requests from l4d2host.paths import get_left4me_root REQUEST_TIMEOUT_SECONDS = 30 DOWNLOAD_CHUNK_BYTES = 1_048_576 def global_overlay_cache_root() -> Path: return get_left4me_root() / "global_overlay_cache" def source_cache_root(source_key: str) -> Path: if "/" in source_key or ".." in source_key or not source_key: raise ValueError(f"invalid source_key: {source_key!r}") return global_overlay_cache_root() / source_key def archive_dir(source_key: str) -> Path: return source_cache_root(source_key) / "archives" def vpk_dir(source_key: str) -> Path: return source_cache_root(source_key) / "vpks" def download_archive(url: str, target: Path, *, should_cancel=None) -> tuple[str, str, int | None]: target.parent.mkdir(parents=True, exist_ok=True) partial = target.with_suffix(target.suffix + ".partial") response = requests.get(url, stream=True, timeout=REQUEST_TIMEOUT_SECONDS) response.raise_for_status() etag = response.headers.get("ETag", "") last_modified = response.headers.get("Last-Modified", "") content_length_raw = response.headers.get("Content-Length") content_length = int(content_length_raw) if content_length_raw and content_length_raw.isdigit() else None try: with open(partial, "wb") as f: for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_BYTES): if should_cancel is not None and should_cancel(): raise InterruptedError("download cancelled") if chunk: f.write(chunk) os.replace(partial, target) except BaseException: partial.unlink(missing_ok=True) raise return etag, last_modified, content_length def safe_extract_zip_vpks(archive_path: Path, output_dir: Path) -> list[Path]: output_dir.mkdir(parents=True, exist_ok=True) extracted: list[Path] = [] with ZipFile(archive_path) as zf: for member in zf.infolist(): name = Path(member.filename) if name.is_absolute() or any(part in {"", ".", ".."} for part in name.parts): raise ValueError(f"unsafe archive member: {member.filename}") if name.suffix.lower() != ".vpk": continue target = output_dir / name.name with zf.open(member) as src, open(target, "wb") as dst: shutil.copyfileobj(src, dst) extracted.append(target) if not extracted: raise ValueError(f"archive {archive_path} did not contain any .vpk files") return sorted(extracted) def safe_extract_7z_vpks(archive_path: Path, output_dir: Path) -> list[Path]: output_dir.mkdir(parents=True, exist_ok=True) with tempfile.TemporaryDirectory(prefix="left4me-7z-") as raw_tmp: raw_dir = Path(raw_tmp) with py7zr.SevenZipFile(archive_path, mode="r") as archive: names = archive.getnames() for name in names: p = Path(name) if p.is_absolute() or any(part in {"", ".", ".."} for part in p.parts): raise ValueError(f"unsafe archive member: {name}") archive.extractall(path=raw_dir) extracted: list[Path] = [] for candidate in raw_dir.rglob("*.vpk"): target = output_dir / candidate.name shutil.move(str(candidate), str(target)) extracted.append(target) if not extracted: raise ValueError(f"archive {archive_path} did not contain any .vpk files") return sorted(extracted) def extracted_vpk_md5(path: Path) -> str: digest = hashlib.md5() with open(path, "rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest()