refactor(l4d2-web): drop global-overlays subsystem in favor of script type
Deletes the global_map_sources, global_overlay_refresh, global_map_cache, and global_overlays service modules and their tests. Removes the refresh-global-overlays CLI command, the /admin/global-overlays/refresh route, and the GlobalOverlaySource view in overlay_detail rendering. Drops py7zr from dependencies — was only used by the deleted subsystem. The job_worker scheduler still tracks refresh_global_overlays; that cleanup is Task 4. Deploy/README references are Task 8. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
d29afa41fa
commit
9f476e3456
17 changed files with 13 additions and 1292 deletions
|
|
@ -41,20 +41,6 @@ def create_user(username: str, admin: bool) -> None:
|
|||
click.echo(f"created user {username}")
|
||||
|
||||
|
||||
@click.command("refresh-global-overlays")
|
||||
def refresh_global_overlays_command() -> None:
|
||||
from l4d2web.services.global_overlays import (
|
||||
ensure_global_overlays,
|
||||
enqueue_refresh_global_overlays,
|
||||
)
|
||||
|
||||
with session_scope() as db:
|
||||
ensure_global_overlays(db)
|
||||
job = enqueue_refresh_global_overlays(db, user_id=None)
|
||||
click.echo(f"queued refresh_global_overlays job #{job.id}")
|
||||
|
||||
|
||||
def register_cli(app) -> None:
|
||||
app.cli.add_command(promote_admin)
|
||||
app.cli.add_command(create_user)
|
||||
app.cli.add_command(refresh_global_overlays_command)
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ dependencies = [
|
|||
"PyYAML>=6.0",
|
||||
"gunicorn>=22.0",
|
||||
"requests>=2.31",
|
||||
"py7zr>=0.21",
|
||||
]
|
||||
|
||||
[tool.setuptools]
|
||||
|
|
|
|||
|
|
@ -8,13 +8,15 @@ from l4d2host.paths import get_left4me_root
|
|||
from l4d2web.auth import current_user, require_login
|
||||
from l4d2web.db import session_scope
|
||||
from l4d2web.models import BlueprintOverlay, Overlay
|
||||
from l4d2web.services.global_overlays import MANAGED_GLOBAL_OVERLAY_TYPES, is_creatable_overlay_type
|
||||
from l4d2web.services.overlay_creation import (
|
||||
create_overlay_directory,
|
||||
generate_overlay_path,
|
||||
)
|
||||
|
||||
|
||||
CREATABLE_OVERLAY_TYPES = {"workshop", "script"}
|
||||
|
||||
|
||||
bp = Blueprint("overlay", __name__)
|
||||
|
||||
|
||||
|
|
@ -25,11 +27,9 @@ def _is_managed_path(overlay: Overlay) -> bool:
|
|||
def _can_edit_overlay(overlay: Overlay, user) -> bool:
|
||||
if user is None:
|
||||
return False
|
||||
if overlay.type in MANAGED_GLOBAL_OVERLAY_TYPES:
|
||||
return False
|
||||
if user.admin:
|
||||
return True
|
||||
if overlay.type == "workshop":
|
||||
if overlay.type in {"workshop", "script"}:
|
||||
return overlay.user_id == user.id
|
||||
return False
|
||||
|
||||
|
|
@ -55,7 +55,7 @@ def create_overlay() -> Response:
|
|||
overlay_type = request.form.get("type", "workshop").strip().lower()
|
||||
if not name:
|
||||
return Response("missing fields", status=400)
|
||||
if not is_creatable_overlay_type(overlay_type, admin=user.admin):
|
||||
if overlay_type not in CREATABLE_OVERLAY_TYPES:
|
||||
return Response(f"unknown overlay type: {overlay_type}", status=400)
|
||||
|
||||
scope_user_id: int | None = user.id
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ from l4d2web.db import session_scope
|
|||
from l4d2web.models import Blueprint as BlueprintModel
|
||||
from l4d2web.models import (
|
||||
BlueprintOverlay,
|
||||
GlobalOverlaySource,
|
||||
Job,
|
||||
Overlay,
|
||||
OverlayWorkshopItem,
|
||||
|
|
@ -43,22 +42,6 @@ def enqueue_runtime_install() -> Response:
|
|||
return redirect("/admin/jobs")
|
||||
|
||||
|
||||
@bp.post("/admin/global-overlays/refresh")
|
||||
@require_admin
|
||||
def enqueue_global_overlay_refresh() -> Response:
|
||||
user = current_user()
|
||||
assert user is not None
|
||||
from l4d2web.services.global_overlays import (
|
||||
ensure_global_overlays,
|
||||
enqueue_refresh_global_overlays,
|
||||
)
|
||||
|
||||
with session_scope() as db:
|
||||
ensure_global_overlays(db)
|
||||
enqueue_refresh_global_overlays(db, user_id=user.id)
|
||||
return redirect("/admin/jobs")
|
||||
|
||||
|
||||
@bp.get("/admin/users")
|
||||
@require_admin
|
||||
def admin_users() -> str:
|
||||
|
|
@ -189,9 +172,6 @@ def overlay_detail(overlay_id: int):
|
|||
return Response(status=404)
|
||||
if not user.admin and overlay.user_id is not None and overlay.user_id != user.id:
|
||||
return Response(status=403)
|
||||
global_source = db.scalar(
|
||||
select(GlobalOverlaySource).where(GlobalOverlaySource.overlay_id == overlay.id)
|
||||
)
|
||||
using_blueprints_query = (
|
||||
select(BlueprintModel)
|
||||
.join(BlueprintOverlay, BlueprintOverlay.blueprint_id == BlueprintModel.id)
|
||||
|
|
@ -221,7 +201,6 @@ def overlay_detail(overlay_id: int):
|
|||
return render_template(
|
||||
"overlay_detail.html",
|
||||
overlay=overlay,
|
||||
global_source=global_source,
|
||||
using_blueprints=using_blueprints,
|
||||
workshop_items=workshop_items,
|
||||
latest_build_job=latest_build_job,
|
||||
|
|
|
|||
|
|
@ -1,106 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
import tempfile
|
||||
from zipfile import ZipFile
|
||||
|
||||
import py7zr
|
||||
import requests
|
||||
|
||||
from l4d2host.paths import get_left4me_root
|
||||
|
||||
|
||||
REQUEST_TIMEOUT_SECONDS = 30
|
||||
DOWNLOAD_CHUNK_BYTES = 1_048_576
|
||||
|
||||
|
||||
def global_overlay_cache_root() -> Path:
|
||||
return get_left4me_root() / "global_overlay_cache"
|
||||
|
||||
|
||||
def source_cache_root(source_key: str) -> Path:
|
||||
if "/" in source_key or ".." in source_key or not source_key:
|
||||
raise ValueError(f"invalid source_key: {source_key!r}")
|
||||
return global_overlay_cache_root() / source_key
|
||||
|
||||
|
||||
def archive_dir(source_key: str) -> Path:
|
||||
return source_cache_root(source_key) / "archives"
|
||||
|
||||
|
||||
def vpk_dir(source_key: str) -> Path:
|
||||
return source_cache_root(source_key) / "vpks"
|
||||
|
||||
|
||||
def download_archive(url: str, target: Path, *, should_cancel=None) -> tuple[str, str, int | None]:
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
partial = target.with_suffix(target.suffix + ".partial")
|
||||
response = requests.get(url, stream=True, timeout=REQUEST_TIMEOUT_SECONDS)
|
||||
response.raise_for_status()
|
||||
etag = response.headers.get("ETag", "")
|
||||
last_modified = response.headers.get("Last-Modified", "")
|
||||
content_length_raw = response.headers.get("Content-Length")
|
||||
content_length = int(content_length_raw) if content_length_raw and content_length_raw.isdigit() else None
|
||||
try:
|
||||
with open(partial, "wb") as f:
|
||||
for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_BYTES):
|
||||
if should_cancel is not None and should_cancel():
|
||||
raise InterruptedError("download cancelled")
|
||||
if chunk:
|
||||
f.write(chunk)
|
||||
os.replace(partial, target)
|
||||
except BaseException:
|
||||
partial.unlink(missing_ok=True)
|
||||
raise
|
||||
return etag, last_modified, content_length
|
||||
|
||||
|
||||
def safe_extract_zip_vpks(archive_path: Path, output_dir: Path) -> list[Path]:
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
extracted: list[Path] = []
|
||||
with ZipFile(archive_path) as zf:
|
||||
for member in zf.infolist():
|
||||
name = Path(member.filename)
|
||||
if name.is_absolute() or any(part in {"", ".", ".."} for part in name.parts):
|
||||
raise ValueError(f"unsafe archive member: {member.filename}")
|
||||
if name.suffix.lower() != ".vpk":
|
||||
continue
|
||||
target = output_dir / name.name
|
||||
with zf.open(member) as src, open(target, "wb") as dst:
|
||||
shutil.copyfileobj(src, dst)
|
||||
extracted.append(target)
|
||||
if not extracted:
|
||||
raise ValueError(f"archive {archive_path} did not contain any .vpk files")
|
||||
return sorted(extracted)
|
||||
|
||||
|
||||
def safe_extract_7z_vpks(archive_path: Path, output_dir: Path) -> list[Path]:
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
with tempfile.TemporaryDirectory(prefix="left4me-7z-") as raw_tmp:
|
||||
raw_dir = Path(raw_tmp)
|
||||
with py7zr.SevenZipFile(archive_path, mode="r") as archive:
|
||||
names = archive.getnames()
|
||||
for name in names:
|
||||
p = Path(name)
|
||||
if p.is_absolute() or any(part in {"", ".", ".."} for part in p.parts):
|
||||
raise ValueError(f"unsafe archive member: {name}")
|
||||
archive.extractall(path=raw_dir)
|
||||
extracted: list[Path] = []
|
||||
for candidate in raw_dir.rglob("*.vpk"):
|
||||
target = output_dir / candidate.name
|
||||
shutil.move(str(candidate), str(target))
|
||||
extracted.append(target)
|
||||
if not extracted:
|
||||
raise ValueError(f"archive {archive_path} did not contain any .vpk files")
|
||||
return sorted(extracted)
|
||||
|
||||
|
||||
def extracted_vpk_md5(path: Path) -> str:
|
||||
digest = hashlib.md5()
|
||||
with open(path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
||||
digest.update(chunk)
|
||||
return digest.hexdigest()
|
||||
|
|
@ -1,104 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
from dataclasses import dataclass
|
||||
import hashlib
|
||||
import html as html_lib
|
||||
import io
|
||||
import json
|
||||
from urllib.parse import urljoin, urlparse
|
||||
import re
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
REQUEST_TIMEOUT_SECONDS = 30
|
||||
L4D2CENTER_CSV_URL = "https://l4d2center.com/maps/servers/index.csv"
|
||||
CEDAPUG_CUSTOM_URL = "https://cedapug.com/custom"
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class GlobalMapManifestItem:
|
||||
item_key: str
|
||||
display_name: str
|
||||
download_url: str
|
||||
expected_vpk_name: str = ""
|
||||
expected_size: int | None = None
|
||||
expected_md5: str = ""
|
||||
|
||||
|
||||
def fetch_l4d2center_manifest() -> tuple[str, list[GlobalMapManifestItem]]:
|
||||
response = requests.get(L4D2CENTER_CSV_URL, timeout=REQUEST_TIMEOUT_SECONDS)
|
||||
response.raise_for_status()
|
||||
text = response.text
|
||||
return _sha256(text), parse_l4d2center_csv(text)
|
||||
|
||||
|
||||
def fetch_cedapug_manifest() -> tuple[str, list[GlobalMapManifestItem]]:
|
||||
response = requests.get(CEDAPUG_CUSTOM_URL, timeout=REQUEST_TIMEOUT_SECONDS)
|
||||
response.raise_for_status()
|
||||
text = response.text
|
||||
return _sha256(text), parse_cedapug_custom_html(text)
|
||||
|
||||
|
||||
def parse_l4d2center_csv(raw: str) -> list[GlobalMapManifestItem]:
|
||||
reader = csv.DictReader(io.StringIO(raw), delimiter=";")
|
||||
expected = ["Name", "Size", "md5", "Download link"]
|
||||
if reader.fieldnames != expected:
|
||||
raise ValueError("expected L4D2Center CSV header: Name;Size;md5;Download link")
|
||||
items: list[GlobalMapManifestItem] = []
|
||||
for row in reader:
|
||||
name = (row.get("Name") or "").strip()
|
||||
size_raw = (row.get("Size") or "").strip()
|
||||
md5 = (row.get("md5") or "").strip().lower()
|
||||
url = (row.get("Download link") or "").strip()
|
||||
if not name or not url:
|
||||
continue
|
||||
items.append(
|
||||
GlobalMapManifestItem(
|
||||
item_key=name,
|
||||
display_name=name,
|
||||
download_url=url,
|
||||
expected_vpk_name=name,
|
||||
expected_size=int(size_raw) if size_raw else None,
|
||||
expected_md5=md5,
|
||||
)
|
||||
)
|
||||
return items
|
||||
|
||||
|
||||
def parse_cedapug_custom_html(raw: str) -> list[GlobalMapManifestItem]:
|
||||
match = re.search(r"renderCustomMapDownloads\((\[.*?\])\)</script>", raw, re.DOTALL)
|
||||
if match is None:
|
||||
raise ValueError("CEDAPUG page did not contain renderCustomMapDownloads data")
|
||||
rows = json.loads(match.group(1))
|
||||
items: list[GlobalMapManifestItem] = []
|
||||
for row in rows:
|
||||
if len(row) < 3:
|
||||
continue
|
||||
label = str(row[1])
|
||||
link = str(row[2])
|
||||
if link.startswith("http"):
|
||||
continue
|
||||
if not link:
|
||||
continue
|
||||
url = urljoin(CEDAPUG_CUSTOM_URL, link)
|
||||
parsed = urlparse(url)
|
||||
basename = parsed.path.rsplit("/", 1)[-1]
|
||||
items.append(
|
||||
GlobalMapManifestItem(
|
||||
item_key=basename,
|
||||
display_name=_strip_html(label),
|
||||
download_url=url,
|
||||
)
|
||||
)
|
||||
return items
|
||||
|
||||
|
||||
def _strip_html(raw: str) -> str:
|
||||
no_tags = re.sub(r"<[^>]+>", "", raw)
|
||||
return html_lib.unescape(no_tags).strip()
|
||||
|
||||
|
||||
def _sha256(raw: str) -> str:
|
||||
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
|
||||
|
|
@ -1,168 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import shutil
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
import tempfile
|
||||
|
||||
from sqlalchemy import select
|
||||
|
||||
from l4d2web.db import session_scope
|
||||
from l4d2web.models import GlobalOverlayItem, GlobalOverlayItemFile, GlobalOverlaySource, Overlay
|
||||
from l4d2web.services.global_map_cache import (
|
||||
archive_dir,
|
||||
download_archive,
|
||||
extracted_vpk_md5,
|
||||
safe_extract_7z_vpks,
|
||||
safe_extract_zip_vpks,
|
||||
vpk_dir,
|
||||
)
|
||||
from l4d2web.services.global_map_sources import (
|
||||
GlobalMapManifestItem,
|
||||
fetch_cedapug_manifest,
|
||||
fetch_l4d2center_manifest,
|
||||
)
|
||||
from l4d2web.services.global_overlays import ensure_global_overlays
|
||||
|
||||
|
||||
def refresh_global_overlays(*, on_stdout, on_stderr, should_cancel) -> list[str]:
|
||||
with session_scope() as db:
|
||||
ensure_global_overlays(db)
|
||||
|
||||
refreshed: list[str] = []
|
||||
for source_key, fetcher in (
|
||||
("l4d2center-maps", fetch_l4d2center_manifest),
|
||||
("cedapug-maps", fetch_cedapug_manifest),
|
||||
):
|
||||
if should_cancel():
|
||||
on_stderr("global overlay refresh cancelled before manifest fetch")
|
||||
return refreshed
|
||||
manifest_hash, manifest_items = fetcher()
|
||||
on_stdout(f"{source_key}: fetched manifest with {len(manifest_items)} item(s)")
|
||||
overlay = _refresh_source(
|
||||
source_key,
|
||||
manifest_hash,
|
||||
manifest_items,
|
||||
on_stdout=on_stdout,
|
||||
on_stderr=on_stderr,
|
||||
should_cancel=should_cancel,
|
||||
)
|
||||
build_global_overlay(overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel)
|
||||
refreshed.append(source_key)
|
||||
return sorted(refreshed)
|
||||
|
||||
|
||||
def _refresh_source(source_key: str, manifest_hash: str, manifest_items: list[GlobalMapManifestItem], *, on_stdout, on_stderr, should_cancel) -> Overlay:
|
||||
now = datetime.now(UTC)
|
||||
desired_keys = {item.item_key for item in manifest_items}
|
||||
with session_scope() as db:
|
||||
source = db.scalar(select(GlobalOverlaySource).where(GlobalOverlaySource.source_key == source_key))
|
||||
if source is None:
|
||||
raise ValueError(f"global overlay source {source_key!r} not found")
|
||||
overlay = db.scalar(select(Overlay).where(Overlay.id == source.overlay_id))
|
||||
if overlay is None:
|
||||
raise ValueError(f"overlay for source {source_key!r} not found")
|
||||
existing_items = {item.item_key: item for item in db.scalars(select(GlobalOverlayItem).where(GlobalOverlayItem.source_id == source.id)).all()}
|
||||
for old_key, old_item in list(existing_items.items()):
|
||||
if old_key not in desired_keys:
|
||||
db.delete(old_item)
|
||||
for manifest_item in manifest_items:
|
||||
item = existing_items.get(manifest_item.item_key)
|
||||
if item is None:
|
||||
item = GlobalOverlayItem(source_id=source.id, item_key=manifest_item.item_key, download_url=manifest_item.download_url)
|
||||
db.add(item)
|
||||
db.flush()
|
||||
item.display_name = manifest_item.display_name
|
||||
item.download_url = manifest_item.download_url
|
||||
item.expected_vpk_name = manifest_item.expected_vpk_name
|
||||
item.expected_size = manifest_item.expected_size
|
||||
item.expected_md5 = manifest_item.expected_md5
|
||||
item.updated_at = now
|
||||
source.last_manifest_hash = manifest_hash
|
||||
source.last_refreshed_at = now
|
||||
source.last_error = ""
|
||||
source.updated_at = now
|
||||
db.expunge(overlay)
|
||||
|
||||
for manifest_item in manifest_items:
|
||||
if should_cancel():
|
||||
on_stderr(f"{source_key}: refresh cancelled during downloads")
|
||||
return overlay
|
||||
_refresh_item(source_key, manifest_item, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel)
|
||||
return overlay
|
||||
|
||||
|
||||
def _refresh_item(source_key: str, manifest_item: GlobalMapManifestItem, *, on_stdout, on_stderr, should_cancel) -> None:
|
||||
try:
|
||||
files, etag, last_modified, content_length = download_and_extract_item(source_key, manifest_item, should_cancel=should_cancel)
|
||||
except Exception as exc:
|
||||
with session_scope() as db:
|
||||
source = db.scalar(select(GlobalOverlaySource).where(GlobalOverlaySource.source_key == source_key))
|
||||
if source is not None:
|
||||
item = db.scalar(select(GlobalOverlayItem).where(GlobalOverlayItem.source_id == source.id, GlobalOverlayItem.item_key == manifest_item.item_key))
|
||||
if item is not None:
|
||||
item.last_error = str(exc)
|
||||
on_stderr(f"{source_key}: {manifest_item.item_key}: {exc}")
|
||||
return
|
||||
|
||||
now = datetime.now(UTC)
|
||||
with session_scope() as db:
|
||||
source = db.scalar(select(GlobalOverlaySource).where(GlobalOverlaySource.source_key == source_key))
|
||||
if source is None:
|
||||
raise ValueError(f"global overlay source {source_key!r} not found")
|
||||
item = db.scalar(select(GlobalOverlayItem).where(GlobalOverlayItem.source_id == source.id, GlobalOverlayItem.item_key == manifest_item.item_key))
|
||||
if item is None:
|
||||
raise ValueError(f"global overlay item {manifest_item.item_key!r} not found")
|
||||
db.query(GlobalOverlayItemFile).filter_by(item_id=item.id).delete()
|
||||
for vpk_name, cache_path, size, md5 in files:
|
||||
db.add(GlobalOverlayItemFile(item_id=item.id, vpk_name=vpk_name, cache_path=cache_path, size=size, md5=md5))
|
||||
item.etag = etag
|
||||
item.last_modified = last_modified
|
||||
item.content_length = content_length
|
||||
item.last_downloaded_at = now
|
||||
item.last_error = ""
|
||||
item.updated_at = now
|
||||
on_stdout(f"{source_key}: refreshed {manifest_item.item_key} ({len(files)} vpk file(s))")
|
||||
|
||||
|
||||
def download_and_extract_item(source_key: str, item: GlobalMapManifestItem, *, should_cancel) -> tuple[list[tuple[str, str, int, str]], str, str, int | None]:
|
||||
archives = archive_dir(source_key)
|
||||
vpks = vpk_dir(source_key)
|
||||
archives.mkdir(parents=True, exist_ok=True)
|
||||
vpks.mkdir(parents=True, exist_ok=True)
|
||||
archive_name = item.download_url.rsplit("/", 1)[-1]
|
||||
archive_path = archives / archive_name
|
||||
etag, last_modified, content_length = download_archive(item.download_url, archive_path, should_cancel=should_cancel)
|
||||
with tempfile.TemporaryDirectory(prefix="left4me-global-map-") as tmp:
|
||||
tmp_dir = Path(tmp)
|
||||
if archive_name.lower().endswith(".7z"):
|
||||
extracted = safe_extract_7z_vpks(archive_path, tmp_dir)
|
||||
elif archive_name.lower().endswith(".zip"):
|
||||
extracted = safe_extract_zip_vpks(archive_path, tmp_dir)
|
||||
else:
|
||||
raise ValueError(f"unsupported archive extension for {archive_name}")
|
||||
results: list[tuple[str, str, int, str]] = []
|
||||
for path in extracted:
|
||||
if item.expected_vpk_name and path.name != item.expected_vpk_name:
|
||||
continue
|
||||
size = path.stat().st_size
|
||||
md5 = extracted_vpk_md5(path)
|
||||
if item.expected_size is not None and size != item.expected_size:
|
||||
raise ValueError(f"{path.name} size mismatch: expected {item.expected_size}, got {size}")
|
||||
if item.expected_md5 and md5 != item.expected_md5:
|
||||
raise ValueError(f"{path.name} md5 mismatch: expected {item.expected_md5}, got {md5}")
|
||||
final = vpks / path.name
|
||||
shutil.move(str(path), str(final))
|
||||
results.append((path.name, f"{source_key}/vpks/{path.name}", size, md5))
|
||||
if not results:
|
||||
raise ValueError(f"no expected .vpk files extracted from {archive_name}")
|
||||
return results, etag, last_modified, content_length
|
||||
|
||||
|
||||
def build_global_overlay(overlay: Overlay, *, on_stdout, on_stderr, should_cancel) -> None:
|
||||
from l4d2web.services.overlay_builders import BUILDERS
|
||||
|
||||
builder = BUILDERS.get(overlay.type)
|
||||
if builder is None:
|
||||
raise ValueError(f"no builder registered for overlay type {overlay.type!r}")
|
||||
builder.build(overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel)
|
||||
|
|
@ -1,112 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
import os
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from l4d2host.paths import get_left4me_root
|
||||
|
||||
from l4d2web.models import GlobalOverlaySource, Job, Overlay
|
||||
from l4d2web.services.overlay_creation import generate_overlay_path
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ManagedGlobalOverlay:
|
||||
name: str
|
||||
overlay_type: str
|
||||
source_type: str
|
||||
source_url: str
|
||||
|
||||
|
||||
GLOBAL_OVERLAYS = (
|
||||
ManagedGlobalOverlay(
|
||||
name="l4d2center-maps",
|
||||
overlay_type="l4d2center_maps",
|
||||
source_type="l4d2center_csv",
|
||||
source_url="https://l4d2center.com/maps/servers/index.csv",
|
||||
),
|
||||
ManagedGlobalOverlay(
|
||||
name="cedapug-maps",
|
||||
overlay_type="cedapug_maps",
|
||||
source_type="cedapug_custom_page",
|
||||
source_url="https://cedapug.com/custom",
|
||||
),
|
||||
)
|
||||
|
||||
MANAGED_GLOBAL_OVERLAY_TYPES = {overlay.overlay_type for overlay in GLOBAL_OVERLAYS}
|
||||
USER_CREATABLE_TYPES = {"workshop"}
|
||||
ADMIN_CREATABLE_TYPES = {"workshop"}
|
||||
|
||||
|
||||
def is_creatable_overlay_type(overlay_type: str, *, admin: bool) -> bool:
|
||||
allowed = ADMIN_CREATABLE_TYPES if admin else USER_CREATABLE_TYPES
|
||||
return overlay_type in allowed
|
||||
|
||||
|
||||
def ensure_global_overlays(session: Session) -> set[str]:
|
||||
created_sources: set[str] = set()
|
||||
for managed in GLOBAL_OVERLAYS:
|
||||
overlay = session.scalar(
|
||||
select(Overlay).where(Overlay.name == managed.name, Overlay.user_id.is_(None))
|
||||
)
|
||||
overlay_created = overlay is None
|
||||
if overlay is None:
|
||||
overlay = Overlay(name=managed.name, path="", type=managed.overlay_type, user_id=None)
|
||||
session.add(overlay)
|
||||
session.flush()
|
||||
overlay.path = generate_overlay_path(overlay.id)
|
||||
else:
|
||||
overlay.type = managed.overlay_type
|
||||
overlay.user_id = None
|
||||
if not overlay.path:
|
||||
overlay.path = generate_overlay_path(overlay.id)
|
||||
|
||||
target = get_left4me_root() / "overlays" / overlay.path
|
||||
os.makedirs(target, exist_ok=not overlay_created)
|
||||
|
||||
source = session.scalar(
|
||||
select(GlobalOverlaySource).where(GlobalOverlaySource.source_key == managed.name)
|
||||
)
|
||||
if source is None:
|
||||
source = GlobalOverlaySource(
|
||||
overlay_id=overlay.id,
|
||||
source_key=managed.name,
|
||||
source_type=managed.source_type,
|
||||
source_url=managed.source_url,
|
||||
)
|
||||
session.add(source)
|
||||
created_sources.add(managed.name)
|
||||
else:
|
||||
source.overlay_id = overlay.id
|
||||
source.source_type = managed.source_type
|
||||
source.source_url = managed.source_url
|
||||
|
||||
session.flush()
|
||||
|
||||
return created_sources
|
||||
|
||||
|
||||
def enqueue_refresh_global_overlays(session: Session, *, user_id: int | None) -> Job:
|
||||
existing = session.scalar(
|
||||
select(Job)
|
||||
.where(
|
||||
Job.operation == "refresh_global_overlays",
|
||||
Job.state.in_({"queued", "running", "cancelling"}),
|
||||
)
|
||||
.order_by(Job.created_at, Job.id)
|
||||
)
|
||||
if existing is not None:
|
||||
return existing
|
||||
|
||||
job = Job(
|
||||
user_id=user_id,
|
||||
server_id=None,
|
||||
overlay_id=None,
|
||||
operation="refresh_global_overlays",
|
||||
state="queued",
|
||||
)
|
||||
session.add(job)
|
||||
session.flush()
|
||||
return job
|
||||
|
|
@ -1,49 +0,0 @@
|
|||
from pathlib import Path
|
||||
from zipfile import ZipFile
|
||||
|
||||
from l4d2web.services.global_map_cache import (
|
||||
extracted_vpk_md5,
|
||||
global_overlay_cache_root,
|
||||
safe_extract_zip_vpks,
|
||||
source_cache_root,
|
||||
)
|
||||
|
||||
|
||||
def test_global_overlay_cache_paths(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
||||
|
||||
assert global_overlay_cache_root() == tmp_path / "global_overlay_cache"
|
||||
assert source_cache_root("l4d2center-maps") == tmp_path / "global_overlay_cache" / "l4d2center-maps"
|
||||
|
||||
|
||||
def test_safe_extract_zip_vpks_extracts_only_vpks(tmp_path):
|
||||
archive = tmp_path / "maps.zip"
|
||||
with ZipFile(archive, "w") as zf:
|
||||
zf.writestr("FatalFreight.vpk", b"vpk-bytes")
|
||||
zf.writestr("readme.txt", b"ignore")
|
||||
|
||||
out_dir = tmp_path / "out"
|
||||
files = safe_extract_zip_vpks(archive, out_dir)
|
||||
|
||||
assert files == [out_dir / "FatalFreight.vpk"]
|
||||
assert (out_dir / "FatalFreight.vpk").read_bytes() == b"vpk-bytes"
|
||||
assert not (out_dir / "readme.txt").exists()
|
||||
|
||||
|
||||
def test_safe_extract_zip_vpks_rejects_path_traversal(tmp_path):
|
||||
archive = tmp_path / "bad.zip"
|
||||
with ZipFile(archive, "w") as zf:
|
||||
zf.writestr("../evil.vpk", b"bad")
|
||||
|
||||
try:
|
||||
safe_extract_zip_vpks(archive, tmp_path / "out")
|
||||
except ValueError as exc:
|
||||
assert "unsafe archive member" in str(exc)
|
||||
else:
|
||||
raise AssertionError("path traversal must fail")
|
||||
|
||||
|
||||
def test_extracted_vpk_md5(tmp_path):
|
||||
p = tmp_path / "x.vpk"
|
||||
p.write_bytes(b"abc")
|
||||
assert extracted_vpk_md5(p) == "900150983cd24fb0d6963f7d28e17f72"
|
||||
|
|
@ -1,65 +0,0 @@
|
|||
from l4d2web.services.global_map_sources import (
|
||||
GlobalMapManifestItem,
|
||||
parse_cedapug_custom_html,
|
||||
parse_l4d2center_csv,
|
||||
)
|
||||
|
||||
|
||||
def test_parse_l4d2center_csv_semicolon_manifest():
|
||||
raw = """Name;Size;md5;Download link
|
||||
carriedoff.vpk;128660532;0380e12c57156574e17a96da1252cf21;https://l4d2center.com/maps/servers/carriedoff.7z
|
||||
"""
|
||||
|
||||
items = parse_l4d2center_csv(raw)
|
||||
|
||||
assert items == [
|
||||
GlobalMapManifestItem(
|
||||
item_key="carriedoff.vpk",
|
||||
display_name="carriedoff.vpk",
|
||||
download_url="https://l4d2center.com/maps/servers/carriedoff.7z",
|
||||
expected_vpk_name="carriedoff.vpk",
|
||||
expected_size=128660532,
|
||||
expected_md5="0380e12c57156574e17a96da1252cf21",
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def test_parse_l4d2center_rejects_missing_header():
|
||||
try:
|
||||
parse_l4d2center_csv("bad,data\n")
|
||||
except ValueError as exc:
|
||||
assert "Name;Size;md5;Download link" in str(exc)
|
||||
else:
|
||||
raise AssertionError("bad header must fail")
|
||||
|
||||
|
||||
def test_parse_cedapug_custom_html_extracts_relative_zip_links():
|
||||
html = """
|
||||
<script>renderCustomMapDownloads([
|
||||
["c1m1_hotel","<span style='color: #977d4c;'>Dead Center<\\/span>"],
|
||||
["l4d2_ff01_woods","<span style='color: #854C34;'>Fatal Freight<\\/span>","\\/maps\\/FatalFreight.zip"],
|
||||
["external","External","https://steamcommunity.com/sharedfiles/filedetails/?id=123"]
|
||||
])</script>
|
||||
"""
|
||||
|
||||
items = parse_cedapug_custom_html(html)
|
||||
|
||||
assert items == [
|
||||
GlobalMapManifestItem(
|
||||
item_key="FatalFreight.zip",
|
||||
display_name="Fatal Freight",
|
||||
download_url="https://cedapug.com/maps/FatalFreight.zip",
|
||||
expected_vpk_name="",
|
||||
expected_size=None,
|
||||
expected_md5="",
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def test_parse_cedapug_custom_html_rejects_missing_data():
|
||||
try:
|
||||
parse_cedapug_custom_html("<html></html>")
|
||||
except ValueError as exc:
|
||||
assert "renderCustomMapDownloads" in str(exc)
|
||||
else:
|
||||
raise AssertionError("missing embedded data must fail")
|
||||
|
|
@ -1,89 +0,0 @@
|
|||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from l4d2web.db import init_db, session_scope
|
||||
from l4d2web.models import GlobalOverlayItem, GlobalOverlayItemFile, GlobalOverlaySource, Overlay
|
||||
from l4d2web.services.overlay_builders import BUILDERS
|
||||
|
||||
|
||||
def seed_source(tmp_path: Path, monkeypatch) -> int:
|
||||
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'builder.db'}")
|
||||
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
||||
init_db()
|
||||
cache_vpk = tmp_path / "global_overlay_cache" / "l4d2center-maps" / "vpks" / "carriedoff.vpk"
|
||||
cache_vpk.parent.mkdir(parents=True, exist_ok=True)
|
||||
cache_vpk.write_bytes(b"vpk")
|
||||
with session_scope() as db:
|
||||
overlay = Overlay(name="l4d2center-maps", path="7", type="l4d2center_maps", user_id=None)
|
||||
db.add(overlay)
|
||||
db.flush()
|
||||
source = GlobalOverlaySource(
|
||||
overlay_id=overlay.id,
|
||||
source_key="l4d2center-maps",
|
||||
source_type="l4d2center_csv",
|
||||
source_url="https://l4d2center.com/maps/servers/index.csv",
|
||||
)
|
||||
db.add(source)
|
||||
db.flush()
|
||||
item = GlobalOverlayItem(
|
||||
source_id=source.id,
|
||||
item_key="carriedoff.vpk",
|
||||
display_name="carriedoff.vpk",
|
||||
download_url="https://example.invalid/carriedoff.7z",
|
||||
expected_vpk_name="carriedoff.vpk",
|
||||
)
|
||||
db.add(item)
|
||||
db.flush()
|
||||
db.add(
|
||||
GlobalOverlayItemFile(
|
||||
item_id=item.id,
|
||||
vpk_name="carriedoff.vpk",
|
||||
cache_path="l4d2center-maps/vpks/carriedoff.vpk",
|
||||
size=3,
|
||||
md5="",
|
||||
)
|
||||
)
|
||||
db.flush()
|
||||
return overlay.id
|
||||
|
||||
|
||||
def test_registry_contains_global_map_builders():
|
||||
assert "l4d2center_maps" in BUILDERS
|
||||
assert "cedapug_maps" in BUILDERS
|
||||
|
||||
|
||||
def test_global_builder_creates_absolute_symlink(tmp_path, monkeypatch):
|
||||
overlay_id = seed_source(tmp_path, monkeypatch)
|
||||
out: list[str] = []
|
||||
err: list[str] = []
|
||||
with session_scope() as db:
|
||||
overlay = db.query(Overlay).filter_by(id=overlay_id).one()
|
||||
BUILDERS["l4d2center_maps"].build(overlay, on_stdout=out.append, on_stderr=err.append, should_cancel=lambda: False)
|
||||
|
||||
link = tmp_path / "overlays" / "7" / "left4dead2" / "addons" / "carriedoff.vpk"
|
||||
assert link.is_symlink()
|
||||
assert os.path.isabs(os.readlink(link))
|
||||
assert link.resolve() == (tmp_path / "global_overlay_cache" / "l4d2center-maps" / "vpks" / "carriedoff.vpk").resolve()
|
||||
assert any("global overlay" in line for line in out)
|
||||
|
||||
|
||||
def test_global_builder_removes_obsolete_managed_symlink_but_keeps_foreign(tmp_path, monkeypatch):
|
||||
overlay_id = seed_source(tmp_path, monkeypatch)
|
||||
addons = tmp_path / "overlays" / "7" / "left4dead2" / "addons"
|
||||
addons.mkdir(parents=True, exist_ok=True)
|
||||
foreign_target = tmp_path / "foreign.vpk"
|
||||
foreign_target.write_bytes(b"foreign")
|
||||
os.symlink(str(foreign_target), addons / "foreign.vpk")
|
||||
|
||||
with session_scope() as db:
|
||||
overlay = db.query(Overlay).filter_by(id=overlay_id).one()
|
||||
BUILDERS["l4d2center_maps"].build(overlay, on_stdout=lambda line: None, on_stderr=lambda line: None, should_cancel=lambda: False)
|
||||
source = db.query(GlobalOverlaySource).filter_by(source_key="l4d2center-maps").one()
|
||||
db.query(GlobalOverlayItem).filter_by(source_id=source.id).delete()
|
||||
|
||||
with session_scope() as db:
|
||||
overlay = db.query(Overlay).filter_by(id=overlay_id).one()
|
||||
BUILDERS["l4d2center_maps"].build(overlay, on_stdout=lambda line: None, on_stderr=lambda line: None, should_cancel=lambda: False)
|
||||
|
||||
assert not (addons / "carriedoff.vpk").exists()
|
||||
assert (addons / "foreign.vpk").is_symlink()
|
||||
|
|
@ -1,19 +0,0 @@
|
|||
from l4d2web.app import create_app
|
||||
from l4d2web.db import init_db, session_scope
|
||||
from l4d2web.models import Job
|
||||
|
||||
|
||||
def test_refresh_global_overlays_cli_enqueues_system_job(tmp_path, monkeypatch):
|
||||
db_url = f"sqlite:///{tmp_path/'cli.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
||||
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||
init_db()
|
||||
|
||||
result = app.test_cli_runner().invoke(args=["refresh-global-overlays"])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert "queued refresh_global_overlays job" in result.output
|
||||
with session_scope() as db:
|
||||
job = db.query(Job).filter_by(operation="refresh_global_overlays").one()
|
||||
assert job.user_id is None
|
||||
|
|
@ -1,154 +0,0 @@
|
|||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from l4d2web.db import init_db, session_scope
|
||||
from l4d2web.models import (
|
||||
GlobalOverlayItem,
|
||||
GlobalOverlayItemFile,
|
||||
GlobalOverlaySource,
|
||||
Job,
|
||||
Overlay,
|
||||
User,
|
||||
)
|
||||
|
||||
|
||||
def test_system_job_allows_null_user_id(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'models.db'}")
|
||||
init_db()
|
||||
|
||||
with session_scope() as db:
|
||||
job = Job(
|
||||
user_id=None,
|
||||
server_id=None,
|
||||
overlay_id=None,
|
||||
operation="refresh_global_overlays",
|
||||
)
|
||||
db.add(job)
|
||||
db.flush()
|
||||
assert job.id is not None
|
||||
assert job.user_id is None
|
||||
|
||||
|
||||
def test_global_overlay_source_uniqueness(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'sources.db'}")
|
||||
init_db()
|
||||
|
||||
with session_scope() as db:
|
||||
overlay = Overlay(
|
||||
name="l4d2center-maps", path="1", type="l4d2center_maps", user_id=None
|
||||
)
|
||||
db.add(overlay)
|
||||
db.flush()
|
||||
db.add(
|
||||
GlobalOverlaySource(
|
||||
overlay_id=overlay.id,
|
||||
source_key="l4d2center-maps",
|
||||
source_type="l4d2center_csv",
|
||||
source_url="https://l4d2center.com/maps/servers/index.csv",
|
||||
)
|
||||
)
|
||||
|
||||
try:
|
||||
with session_scope() as db:
|
||||
other = Overlay(
|
||||
name="cedapug-maps", path="2", type="cedapug_maps", user_id=None
|
||||
)
|
||||
db.add(other)
|
||||
db.flush()
|
||||
db.add(
|
||||
GlobalOverlaySource(
|
||||
overlay_id=other.id,
|
||||
source_key="l4d2center-maps",
|
||||
source_type="l4d2center_csv",
|
||||
source_url="https://example.invalid/duplicate",
|
||||
)
|
||||
)
|
||||
except IntegrityError:
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("duplicate source_key must fail")
|
||||
|
||||
|
||||
def test_global_overlay_items_and_files_are_unique_per_parent(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'items.db'}")
|
||||
init_db()
|
||||
|
||||
with session_scope() as db:
|
||||
overlay = Overlay(name="cedapug-maps", path="1", type="cedapug_maps", user_id=None)
|
||||
db.add(overlay)
|
||||
db.flush()
|
||||
source = GlobalOverlaySource(
|
||||
overlay_id=overlay.id,
|
||||
source_key="cedapug-maps",
|
||||
source_type="cedapug_custom_page",
|
||||
source_url="https://cedapug.com/custom",
|
||||
)
|
||||
db.add(source)
|
||||
db.flush()
|
||||
item = GlobalOverlayItem(
|
||||
source_id=source.id,
|
||||
item_key="FatalFreight.zip",
|
||||
display_name="Fatal Freight",
|
||||
download_url="https://cedapug.com/maps/FatalFreight.zip",
|
||||
expected_vpk_name="FatalFreight.vpk",
|
||||
)
|
||||
db.add(item)
|
||||
db.flush()
|
||||
db.add(
|
||||
GlobalOverlayItemFile(
|
||||
item_id=item.id,
|
||||
vpk_name="FatalFreight.vpk",
|
||||
cache_path="cedapug-maps/vpks/FatalFreight.vpk",
|
||||
size=123,
|
||||
md5="",
|
||||
)
|
||||
)
|
||||
item_id = item.id
|
||||
|
||||
try:
|
||||
with session_scope() as db:
|
||||
source = db.query(GlobalOverlaySource).filter_by(source_key="cedapug-maps").one()
|
||||
db.add(
|
||||
GlobalOverlayItem(
|
||||
source_id=source.id,
|
||||
item_key="FatalFreight.zip",
|
||||
display_name="Fatal Freight duplicate",
|
||||
download_url="https://cedapug.com/maps/FatalFreight.zip",
|
||||
expected_vpk_name="FatalFreight.vpk",
|
||||
)
|
||||
)
|
||||
except IntegrityError:
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("duplicate item_key per source must fail")
|
||||
|
||||
try:
|
||||
with session_scope() as db:
|
||||
db.add(
|
||||
GlobalOverlayItemFile(
|
||||
item_id=item_id,
|
||||
vpk_name="FatalFreight.vpk",
|
||||
cache_path="cedapug-maps/vpks/FatalFreight-copy.vpk",
|
||||
size=456,
|
||||
md5="",
|
||||
)
|
||||
)
|
||||
except IntegrityError:
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("duplicate vpk_name per item must fail")
|
||||
|
||||
|
||||
def test_normal_user_rows_still_require_real_users(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'users.db'}")
|
||||
init_db()
|
||||
|
||||
with session_scope() as db:
|
||||
user = User(username="alice", password_digest="digest", admin=False)
|
||||
db.add(user)
|
||||
db.flush()
|
||||
job = Job(user_id=user.id, server_id=None, operation="install", state="queued")
|
||||
db.add(job)
|
||||
db.flush()
|
||||
|
||||
assert job.id is not None
|
||||
assert job.user_id == user.id
|
||||
|
|
@ -1,69 +0,0 @@
|
|||
from pathlib import Path
|
||||
|
||||
from l4d2web.db import init_db, session_scope
|
||||
from l4d2web.models import GlobalOverlayItem, GlobalOverlayItemFile, GlobalOverlaySource
|
||||
from l4d2web.services.global_map_sources import GlobalMapManifestItem
|
||||
|
||||
|
||||
def test_refresh_global_overlays_updates_manifest_items_and_invokes_builders(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'refresh.db'}")
|
||||
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
||||
init_db()
|
||||
|
||||
from l4d2web.services import global_overlay_refresh
|
||||
monkeypatch.setattr(
|
||||
global_overlay_refresh,
|
||||
"fetch_l4d2center_manifest",
|
||||
lambda: ("hash-center", [GlobalMapManifestItem("carriedoff.vpk", "carriedoff.vpk", "https://example.invalid/carriedoff.7z", "carriedoff.vpk", 3, "" )]),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
global_overlay_refresh,
|
||||
"fetch_cedapug_manifest",
|
||||
lambda: ("hash-ceda", [GlobalMapManifestItem("FatalFreight.zip", "Fatal Freight", "https://example.invalid/FatalFreight.zip")]),
|
||||
)
|
||||
|
||||
def fake_download_and_extract(source_key, item, *, should_cancel):
|
||||
target = tmp_path / "global_overlay_cache" / source_key / "vpks" / (item.expected_vpk_name or item.item_key.replace(".zip", ".vpk"))
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
target.write_bytes(b"vpk")
|
||||
return [(target.name, f"{source_key}/vpks/{target.name}", 3, "")], "etag", "last-modified", 3
|
||||
|
||||
built: list[str] = []
|
||||
monkeypatch.setattr(global_overlay_refresh, "download_and_extract_item", fake_download_and_extract)
|
||||
monkeypatch.setattr(global_overlay_refresh, "build_global_overlay", lambda overlay, **kwargs: built.append(overlay.name))
|
||||
|
||||
out: list[str] = []
|
||||
result = global_overlay_refresh.refresh_global_overlays(on_stdout=out.append, on_stderr=out.append, should_cancel=lambda: False)
|
||||
|
||||
assert result == ["cedapug-maps", "l4d2center-maps"]
|
||||
assert set(built) == {"cedapug-maps", "l4d2center-maps"}
|
||||
with session_scope() as db:
|
||||
assert db.query(GlobalOverlaySource).count() == 2
|
||||
assert db.query(GlobalOverlayItem).count() == 2
|
||||
assert db.query(GlobalOverlayItemFile).count() == 2
|
||||
|
||||
|
||||
def test_refresh_removes_items_absent_from_manifest(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'remove.db'}")
|
||||
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
||||
init_db()
|
||||
|
||||
from l4d2web.services.global_overlays import ensure_global_overlays
|
||||
from l4d2web.services import global_overlay_refresh
|
||||
|
||||
with session_scope() as db:
|
||||
ensure_global_overlays(db)
|
||||
source = db.query(GlobalOverlaySource).filter_by(source_key="l4d2center-maps").one()
|
||||
item = GlobalOverlayItem(source_id=source.id, item_key="old.vpk", display_name="old.vpk", download_url="https://example.invalid/old.7z")
|
||||
db.add(item)
|
||||
db.flush()
|
||||
db.add(GlobalOverlayItemFile(item_id=item.id, vpk_name="old.vpk", cache_path="l4d2center-maps/vpks/old.vpk", size=3))
|
||||
|
||||
monkeypatch.setattr(global_overlay_refresh, "fetch_l4d2center_manifest", lambda: ("empty-center", []))
|
||||
monkeypatch.setattr(global_overlay_refresh, "fetch_cedapug_manifest", lambda: ("empty-ceda", []))
|
||||
monkeypatch.setattr(global_overlay_refresh, "build_global_overlay", lambda overlay, **kwargs: None)
|
||||
|
||||
global_overlay_refresh.refresh_global_overlays(on_stdout=lambda line: None, on_stderr=lambda line: None, should_cancel=lambda: False)
|
||||
|
||||
with session_scope() as db:
|
||||
assert db.query(GlobalOverlayItem).filter_by(item_key="old.vpk").count() == 0
|
||||
|
|
@ -1,167 +0,0 @@
|
|||
from sqlalchemy import select
|
||||
|
||||
from l4d2web.db import init_db, session_scope
|
||||
from l4d2web.models import GlobalOverlaySource, Job, Overlay, User
|
||||
from l4d2web.services.global_overlays import (
|
||||
enqueue_refresh_global_overlays,
|
||||
ensure_global_overlays,
|
||||
is_creatable_overlay_type,
|
||||
)
|
||||
|
||||
|
||||
def test_ensure_global_overlays_creates_singletons_and_directories(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'global_overlays.db'}")
|
||||
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
||||
init_db()
|
||||
|
||||
with session_scope() as session:
|
||||
created = ensure_global_overlays(session)
|
||||
assert created == {"cedapug-maps", "l4d2center-maps"}
|
||||
|
||||
second = ensure_global_overlays(session)
|
||||
assert second == set()
|
||||
|
||||
overlays = session.scalars(select(Overlay).order_by(Overlay.name)).all()
|
||||
assert [overlay.name for overlay in overlays] == ["cedapug-maps", "l4d2center-maps"]
|
||||
assert [overlay.type for overlay in overlays] == ["cedapug_maps", "l4d2center_maps"]
|
||||
assert [overlay.user_id for overlay in overlays] == [None, None]
|
||||
assert len({overlay.path for overlay in overlays}) == 2
|
||||
for overlay in overlays:
|
||||
assert (tmp_path / "overlays" / overlay.path).is_dir()
|
||||
|
||||
sources = session.scalars(select(GlobalOverlaySource).order_by(GlobalOverlaySource.source_key)).all()
|
||||
assert [source.source_key for source in sources] == ["cedapug-maps", "l4d2center-maps"]
|
||||
assert [source.source_type for source in sources] == [
|
||||
"cedapug_custom_page",
|
||||
"l4d2center_csv",
|
||||
]
|
||||
|
||||
|
||||
def test_ensure_global_overlays_repairs_existing_rows(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'global_overlay_repair.db'}")
|
||||
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
||||
init_db()
|
||||
|
||||
with session_scope() as session:
|
||||
overlay = Overlay(name="cedapug-maps", path="legacy", type="cedapug_maps", user_id=None)
|
||||
session.add(overlay)
|
||||
session.flush()
|
||||
session.add(
|
||||
GlobalOverlaySource(
|
||||
overlay_id=overlay.id,
|
||||
source_key="cedapug-maps",
|
||||
source_type="wrong",
|
||||
source_url="https://example.invalid/wrong",
|
||||
)
|
||||
)
|
||||
|
||||
(tmp_path / "overlays" / "legacy").mkdir(parents=True)
|
||||
|
||||
with session_scope() as session:
|
||||
created = ensure_global_overlays(session)
|
||||
assert created == {"l4d2center-maps"}
|
||||
|
||||
repaired = session.scalar(select(Overlay).where(Overlay.name == "cedapug-maps"))
|
||||
assert repaired is not None
|
||||
assert repaired.type == "cedapug_maps"
|
||||
assert repaired.user_id is None
|
||||
assert (tmp_path / "overlays" / repaired.path).is_dir()
|
||||
|
||||
source = session.scalar(
|
||||
select(GlobalOverlaySource).where(GlobalOverlaySource.source_key == "cedapug-maps")
|
||||
)
|
||||
assert source is not None
|
||||
assert source.source_type == "cedapug_custom_page"
|
||||
assert source.source_url == "https://cedapug.com/custom"
|
||||
|
||||
|
||||
def test_ensure_global_overlays_does_not_hijack_private_overlay_name(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'global_overlay_private_name.db'}")
|
||||
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
||||
init_db()
|
||||
|
||||
with session_scope() as session:
|
||||
user = User(username="alice", password_digest="digest", admin=False)
|
||||
session.add(user)
|
||||
session.flush()
|
||||
private = Overlay(
|
||||
name="l4d2center-maps",
|
||||
path="private-l4d2center",
|
||||
type="workshop",
|
||||
user_id=user.id,
|
||||
)
|
||||
session.add(private)
|
||||
session.flush()
|
||||
private_id = private.id
|
||||
private_user_id = user.id
|
||||
|
||||
with session_scope() as session:
|
||||
created = ensure_global_overlays(session)
|
||||
|
||||
assert created == {"cedapug-maps", "l4d2center-maps"}
|
||||
private = session.scalar(select(Overlay).where(Overlay.id == private_id))
|
||||
assert private is not None
|
||||
assert private.user_id == private_user_id
|
||||
assert private.type == "workshop"
|
||||
assert private.path == "private-l4d2center"
|
||||
|
||||
system = session.scalar(
|
||||
select(Overlay).where(Overlay.name == "l4d2center-maps", Overlay.user_id.is_(None))
|
||||
)
|
||||
assert system is not None
|
||||
assert system.id != private_id
|
||||
assert system.type == "l4d2center_maps"
|
||||
assert (tmp_path / "overlays" / system.path).is_dir()
|
||||
|
||||
source = session.scalar(
|
||||
select(GlobalOverlaySource).where(GlobalOverlaySource.source_key == "l4d2center-maps")
|
||||
)
|
||||
assert source is not None
|
||||
assert source.overlay_id == system.id
|
||||
|
||||
|
||||
def test_enqueue_refresh_global_overlays_coalesces_active_jobs(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'refresh_jobs.db'}")
|
||||
init_db()
|
||||
|
||||
for state in ("queued", "running", "cancelling"):
|
||||
with session_scope() as session:
|
||||
session.query(Job).delete()
|
||||
existing = Job(
|
||||
user_id=7,
|
||||
server_id=None,
|
||||
overlay_id=None,
|
||||
operation="refresh_global_overlays",
|
||||
state=state,
|
||||
)
|
||||
session.add(existing)
|
||||
session.flush()
|
||||
existing_id = existing.id
|
||||
|
||||
job = enqueue_refresh_global_overlays(session, user_id=None)
|
||||
assert job.id == existing_id
|
||||
assert session.query(Job).filter_by(operation="refresh_global_overlays").count() == 1
|
||||
|
||||
|
||||
def test_enqueue_refresh_global_overlays_creates_system_job(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'refresh_system_job.db'}")
|
||||
init_db()
|
||||
|
||||
with session_scope() as session:
|
||||
job = enqueue_refresh_global_overlays(session, user_id=None)
|
||||
|
||||
assert job.id is not None
|
||||
assert job.user_id is None
|
||||
assert job.server_id is None
|
||||
assert job.overlay_id is None
|
||||
assert job.operation == "refresh_global_overlays"
|
||||
assert job.state == "queued"
|
||||
|
||||
|
||||
def test_is_creatable_overlay_type_policy():
|
||||
assert is_creatable_overlay_type("workshop", admin=False) is True
|
||||
assert is_creatable_overlay_type("workshop", admin=True) is True
|
||||
assert is_creatable_overlay_type("external", admin=False) is False
|
||||
assert is_creatable_overlay_type("external", admin=True) is False
|
||||
assert is_creatable_overlay_type("l4d2center_maps", admin=True) is False
|
||||
assert is_creatable_overlay_type("cedapug_maps", admin=True) is False
|
||||
|
|
@ -260,53 +260,3 @@ def test_initialize_fails_fast_on_uncached_workshop_items(
|
|||
assert all("initialize" not in cmd for cmd in invocations), invocations
|
||||
|
||||
|
||||
def test_initialize_fails_when_global_overlay_cache_file_missing(tmp_path, monkeypatch):
|
||||
from l4d2web.db import init_db, session_scope
|
||||
from l4d2web.models import (
|
||||
Blueprint,
|
||||
BlueprintOverlay,
|
||||
GlobalOverlayItem,
|
||||
GlobalOverlayItemFile,
|
||||
GlobalOverlaySource,
|
||||
Overlay,
|
||||
Server,
|
||||
User,
|
||||
)
|
||||
from l4d2web.services.l4d2_facade import initialize_server
|
||||
|
||||
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'facade-global.db'}")
|
||||
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
||||
init_db()
|
||||
|
||||
with session_scope() as db:
|
||||
user = User(username="alice", password_digest="digest")
|
||||
db.add(user)
|
||||
db.flush()
|
||||
overlay = Overlay(name="l4d2center-maps", path="7", type="l4d2center_maps", user_id=None)
|
||||
db.add(overlay)
|
||||
db.flush()
|
||||
source = GlobalOverlaySource(overlay_id=overlay.id, source_key="l4d2center-maps", source_type="l4d2center_csv", source_url="https://l4d2center.com/maps/servers/index.csv")
|
||||
db.add(source)
|
||||
db.flush()
|
||||
item = GlobalOverlayItem(source_id=source.id, item_key="carriedoff.vpk", display_name="carriedoff.vpk", download_url="https://example.invalid/carriedoff.7z")
|
||||
db.add(item)
|
||||
db.flush()
|
||||
db.add(GlobalOverlayItemFile(item_id=item.id, vpk_name="carriedoff.vpk", cache_path="l4d2center-maps/vpks/carriedoff.vpk", size=123))
|
||||
blueprint = Blueprint(user_id=user.id, name="bp", arguments="[]", config="[]")
|
||||
db.add(blueprint)
|
||||
db.flush()
|
||||
db.add(BlueprintOverlay(blueprint_id=blueprint.id, overlay_id=overlay.id, position=0))
|
||||
server = Server(user_id=user.id, blueprint_id=blueprint.id, name="alpha", port=27015)
|
||||
db.add(server)
|
||||
db.flush()
|
||||
server_id = server.id
|
||||
|
||||
monkeypatch.setattr("l4d2web.services.host_commands.run_command", lambda *args, **kwargs: None)
|
||||
|
||||
try:
|
||||
initialize_server(server_id)
|
||||
except RuntimeError as exc:
|
||||
assert "carriedoff.vpk" in str(exc)
|
||||
assert "l4d2center-maps" in str(exc)
|
||||
else:
|
||||
raise AssertionError("missing global overlay cache file must fail")
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import pytest
|
|||
from l4d2web.app import create_app
|
||||
from l4d2web.auth import hash_password
|
||||
from l4d2web.db import init_db, session_scope
|
||||
from l4d2web.models import Blueprint, BlueprintOverlay, GlobalOverlaySource, Overlay, User
|
||||
from l4d2web.models import Blueprint, BlueprintOverlay, Overlay, User
|
||||
from l4d2web.services.security import validate_overlay_ref
|
||||
|
||||
|
||||
|
|
@ -38,9 +38,9 @@ def user_client_with_overlay(tmp_path, monkeypatch):
|
|||
with session_scope() as session:
|
||||
user = User(username="alice", password_digest=hash_password("secret"), admin=False)
|
||||
session.add(user)
|
||||
# System overlay (managed-global, no user_id), pre-existing.
|
||||
# System overlay (workshop, no user_id), pre-existing.
|
||||
session.add(
|
||||
Overlay(name="standard", path="standard", type="l4d2center_maps", user_id=None)
|
||||
Overlay(name="standard", path="standard", type="workshop", user_id=None)
|
||||
)
|
||||
session.flush()
|
||||
user_id = user.id
|
||||
|
|
@ -62,16 +62,6 @@ def test_user_can_view_overlay_catalog(user_client_with_overlay) -> None:
|
|||
assert "Create overlay" in text
|
||||
|
||||
|
||||
def test_non_admin_can_view_managed_global_system_overlay(user_client_with_overlay) -> None:
|
||||
_create_managed_global_overlay()
|
||||
|
||||
response = user_client_with_overlay.get("/overlays")
|
||||
text = response.get_data(as_text=True)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert "l4d2center-maps" in text
|
||||
|
||||
|
||||
def test_admin_can_view_overlay_edit_controls(admin_client) -> None:
|
||||
response = admin_client.get("/overlays")
|
||||
text = response.get_data(as_text=True)
|
||||
|
|
@ -197,62 +187,6 @@ def test_admin_can_update_and_delete_overlay(admin_client) -> None:
|
|||
assert delete.status_code == 302
|
||||
|
||||
|
||||
def _create_managed_global_overlay() -> int:
|
||||
with session_scope() as session:
|
||||
overlay = Overlay(
|
||||
name="l4d2center-maps",
|
||||
path="managed-l4d2center",
|
||||
type="l4d2center_maps",
|
||||
user_id=None,
|
||||
)
|
||||
session.add(overlay)
|
||||
session.flush()
|
||||
session.add(
|
||||
GlobalOverlaySource(
|
||||
overlay_id=overlay.id,
|
||||
source_key="l4d2center-maps",
|
||||
source_type="l4d2center_csv",
|
||||
source_url="https://l4d2center.com/maps/servers/index.csv",
|
||||
)
|
||||
)
|
||||
return overlay.id
|
||||
|
||||
|
||||
def test_admin_cannot_update_managed_global_overlay(admin_client) -> None:
|
||||
overlay_id = _create_managed_global_overlay()
|
||||
|
||||
response = admin_client.post(
|
||||
f"/overlays/{overlay_id}",
|
||||
data={"name": "renamed"},
|
||||
headers={"X-CSRF-Token": "test-token"},
|
||||
)
|
||||
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
def test_admin_cannot_delete_managed_global_overlay(admin_client) -> None:
|
||||
overlay_id = _create_managed_global_overlay()
|
||||
|
||||
response = admin_client.post(
|
||||
f"/overlays/{overlay_id}/delete",
|
||||
headers={"X-CSRF-Token": "test-token"},
|
||||
)
|
||||
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
def test_admin_overlay_detail_hides_edit_for_managed_global_overlay(admin_client) -> None:
|
||||
overlay_id = _create_managed_global_overlay()
|
||||
|
||||
response = admin_client.get(f"/overlays/{overlay_id}")
|
||||
text = response.get_data(as_text=True)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert f'action="/overlays/{overlay_id}"' not in text
|
||||
assert "delete-overlay-modal" not in text
|
||||
|
||||
|
||||
|
||||
def test_update_overlay_rejects_duplicate_name(admin_client) -> None:
|
||||
ids: list[int] = []
|
||||
for name in ("standard", "competitive"):
|
||||
|
|
@ -305,13 +239,15 @@ def test_overlay_detail_page_lists_using_blueprints(admin_client) -> None:
|
|||
|
||||
|
||||
def test_non_admin_overlay_detail_only_lists_own_using_blueprints(user_client_with_overlay) -> None:
|
||||
overlay_id = _create_managed_global_overlay()
|
||||
with session_scope() as session:
|
||||
alice = session.query(User).filter_by(username="alice").one()
|
||||
other = User(username="mallory", password_digest=hash_password("secret"), admin=False)
|
||||
session.add(other)
|
||||
session.flush()
|
||||
|
||||
# Use the seeded system "standard" overlay (id=1).
|
||||
overlay_id = session.query(Overlay).filter_by(name="standard").one().id
|
||||
|
||||
own_bp = Blueprint(user_id=alice.id, name="own-bp", arguments="[]", config="[]")
|
||||
other_bp = Blueprint(user_id=other.id, name="other-private-bp", arguments="[]", config="[]")
|
||||
session.add_all([own_bp, other_bp])
|
||||
|
|
@ -328,12 +264,12 @@ def test_non_admin_overlay_detail_only_lists_own_using_blueprints(user_client_wi
|
|||
|
||||
|
||||
def test_blueprint_edit_lists_system_and_owned_overlays_only(user_client_with_overlay) -> None:
|
||||
system_overlay_id = _create_managed_global_overlay()
|
||||
with session_scope() as session:
|
||||
alice = session.query(User).filter_by(username="alice").one()
|
||||
other = User(username="mallory", password_digest=hash_password("secret"), admin=False)
|
||||
session.add(other)
|
||||
session.flush()
|
||||
system_overlay_id = session.query(Overlay).filter_by(name="standard").one().id
|
||||
foreign_overlay = Overlay(
|
||||
name="other-private-workshop",
|
||||
path="other-private-workshop",
|
||||
|
|
@ -349,7 +285,7 @@ def test_blueprint_edit_lists_system_and_owned_overlays_only(user_client_with_ov
|
|||
text = response.get_data(as_text=True)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert "l4d2center-maps" in text
|
||||
assert "standard" in text
|
||||
assert f'value="{system_overlay_id}"' in text
|
||||
assert "other-private-workshop" not in text
|
||||
|
||||
|
|
@ -359,27 +295,6 @@ def test_overlay_detail_page_404_when_missing(admin_client) -> None:
|
|||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_overlay_detail_hides_edit_for_non_admin_managed_global(user_client_with_overlay) -> None:
|
||||
# The seeded "standard" managed-global overlay (id=1, user_id=NULL) is read-only for non-admins.
|
||||
response = user_client_with_overlay.get("/overlays/1")
|
||||
text = response.get_data(as_text=True)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert "standard" in text
|
||||
assert 'action="/overlays/1"' not in text
|
||||
assert "delete-overlay-modal" not in text
|
||||
|
||||
|
||||
def test_managed_global_overlay_detail_shows_source_url(admin_client) -> None:
|
||||
overlay_id = _create_managed_global_overlay()
|
||||
|
||||
response = admin_client.get(f"/overlays/{overlay_id}")
|
||||
text = response.get_data(as_text=True)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert "https://l4d2center.com/maps/servers/index.csv" in text
|
||||
|
||||
|
||||
def test_overlay_update_redirects_to_detail(admin_client) -> None:
|
||||
create = admin_client.post(
|
||||
"/overlays",
|
||||
|
|
@ -422,9 +337,3 @@ def test_delete_overlay_rejects_in_use_overlay(admin_client) -> None:
|
|||
)
|
||||
|
||||
assert response.status_code == 409
|
||||
|
||||
|
||||
def test_admin_can_enqueue_refresh_global_overlays(admin_client):
|
||||
response = admin_client.post("/admin/global-overlays/refresh", headers={"X-CSRF-Token": "test-token"})
|
||||
assert response.status_code == 302
|
||||
assert response.headers["Location"] == "/admin/jobs"
|
||||
|
|
|
|||
Loading…
Reference in a new issue