feat(l4d2-web): worker support for build_overlay and refresh_workshop_items

Extends SchedulerState with running_overlays / refresh_running /
blocked_servers_by_overlay, and updates can_start with the truth table:
install and refresh_workshop_items are global mutexes; build_overlay
serializes per-overlay; server jobs block on builds for any overlay
their blueprint references.

Adds enqueue_build_overlay coalescing helper that returns an existing
queued job for the same overlay rather than inserting a duplicate.

Adds run_job dispatch for build_overlay (BUILDERS[overlay.type].build)
and refresh_workshop_items (re-fetches metadata, re-downloads on
time_updated/filename change, enqueues coalesced rebuilds for affected
overlays).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
mwiegand 2026-05-07 16:44:10 +02:00
parent 700940d578
commit 38a6fbbe1e
No known key found for this signature in database
2 changed files with 488 additions and 8 deletions

View file

@ -10,13 +10,24 @@ from sqlalchemy.orm import Session
from typing import Callable from typing import Callable
from l4d2web.db import session_scope from l4d2web.db import session_scope
from l4d2web.models import Job, JobLog, Server from l4d2web.models import (
Blueprint,
BlueprintOverlay,
Job,
JobLog,
Overlay,
OverlayWorkshopItem,
Server,
WorkshopItem,
)
from l4d2web.services.host_commands import CommandCancelledError from l4d2web.services.host_commands import CommandCancelledError
TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"} TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"}
ACTIVE_JOB_STATES = {"running", "cancelling"} ACTIVE_JOB_STATES = {"running", "cancelling"}
SERVER_OPERATIONS = {"initialize", "start", "stop", "delete"} SERVER_OPERATIONS = {"initialize", "start", "stop", "delete"}
OVERLAY_OPERATIONS = {"build_overlay"}
GLOBAL_OPERATIONS = {"install", "refresh_workshop_items"}
_claim_lock = threading.Lock() _claim_lock = threading.Lock()
_log_lock = threading.RLock() _log_lock = threading.RLock()
@ -27,17 +38,55 @@ _workers_started = False
@dataclass @dataclass
class SchedulerState: class SchedulerState:
install_running: bool = False install_running: bool = False
refresh_running: bool = False
running_servers: set[int] = field(default_factory=set) running_servers: set[int] = field(default_factory=set)
running_overlays: set[int] = field(default_factory=set)
blocked_servers_by_overlay: set[int] = field(default_factory=set)
def can_start(job, state: SchedulerState) -> bool: def can_start(job, state: SchedulerState) -> bool:
"""Truth table for the worker's claim policy.
install / refresh_workshop_items are global mutexes they block each
other, all build_overlay jobs, and all server jobs.
build_overlay(overlay_id=N) is per-overlay: blocks on install/refresh, and
on another build for the same overlay. Different overlays may build
concurrently.
Server start/init blocks on install/refresh and on a build_overlay for any
overlay referenced by the server's blueprint.
"""
if job.operation == "install": if job.operation == "install":
return (not state.install_running) and (len(state.running_servers) == 0) return (
if state.install_running: not state.install_running
and not state.refresh_running
and len(state.running_servers) == 0
and len(state.running_overlays) == 0
)
if job.operation == "refresh_workshop_items":
return (
not state.install_running
and not state.refresh_running
and len(state.running_servers) == 0
and len(state.running_overlays) == 0
)
if job.operation == "build_overlay":
if state.install_running or state.refresh_running:
return False
if job.overlay_id is None:
return False
return job.overlay_id not in state.running_overlays
# Server operations from here on.
if state.install_running or state.refresh_running:
return False return False
if job.server_id is None: if job.server_id is None:
return False return False
return job.server_id not in state.running_servers if job.server_id in state.running_servers:
return False
if job.server_id in state.blocked_servers_by_overlay:
return False
return True
def build_scheduler_state(session: Session) -> SchedulerState: def build_scheduler_state(session: Session) -> SchedulerState:
@ -46,8 +95,22 @@ def build_scheduler_state(session: Session) -> SchedulerState:
for job in running_jobs: for job in running_jobs:
if job.operation == "install": if job.operation == "install":
state.install_running = True state.install_running = True
elif job.operation == "refresh_workshop_items":
state.refresh_running = True
elif job.operation == "build_overlay" and job.overlay_id is not None:
state.running_overlays.add(job.overlay_id)
elif job.server_id is not None: elif job.server_id is not None:
state.running_servers.add(job.server_id) state.running_servers.add(job.server_id)
if state.running_overlays:
rows = session.execute(
select(Server.id)
.join(Blueprint, Blueprint.id == Server.blueprint_id)
.join(BlueprintOverlay, BlueprintOverlay.blueprint_id == Blueprint.id)
.where(BlueprintOverlay.overlay_id.in_(state.running_overlays))
).all()
state.blocked_servers_by_overlay = {row[0] for row in rows}
return state return state
@ -65,8 +128,22 @@ def claim_next_job() -> int | None:
jobs = db.scalars(select(Job).where(Job.state == "queued").order_by(Job.created_at, Job.id)).all() jobs = db.scalars(select(Job).where(Job.state == "queued").order_by(Job.created_at, Job.id)).all()
now = datetime.now(UTC) now = datetime.now(UTC)
for job in jobs: for job in jobs:
malformed_server_job = job.operation != "install" and job.server_id is None malformed_server_job = (
if not malformed_server_job and not can_start(job, state): job.operation in SERVER_OPERATIONS and job.server_id is None
)
malformed_overlay_job = (
job.operation in OVERLAY_OPERATIONS and job.overlay_id is None
)
if malformed_server_job or malformed_overlay_job:
# Mark malformed jobs failed immediately so the scheduler can move on.
job.state = "failed"
job.exit_code = 1
job.started_at = now
job.finished_at = now
job.updated_at = now
db.flush()
continue
if not can_start(job, state):
continue continue
job.state = "running" job.state = "running"
@ -78,6 +155,31 @@ def claim_next_job() -> int | None:
return None return None
def enqueue_build_overlay(session: Session, *, overlay_id: int, user_id: int) -> Job:
"""Insert a `build_overlay` job, coalescing against any already-queued
(not-yet-running) build for the same overlay. Running jobs are NOT
coalesced a fresh add after a build started gets its own job."""
existing = session.scalar(
select(Job).where(
Job.operation == "build_overlay",
Job.overlay_id == overlay_id,
Job.state == "queued",
)
)
if existing is not None:
return existing
job = Job(
user_id=user_id,
server_id=None,
overlay_id=overlay_id,
operation="build_overlay",
state="queued",
)
session.add(job)
session.flush()
return job
def run_worker_once() -> bool: def run_worker_once() -> bool:
job_id = claim_next_job() job_id = claim_next_job()
if job_id is None: if job_id is None:
@ -90,12 +192,14 @@ def run_job(job_id: int) -> None:
from l4d2web.services import l4d2_facade from l4d2web.services import l4d2_facade
server_name = "unknown" server_name = "unknown"
overlay_id_for_job: int | None = None
with session_scope() as db: with session_scope() as db:
job = db.scalar(select(Job).where(Job.id == job_id)) job = db.scalar(select(Job).where(Job.id == job_id))
if job is None: if job is None:
return return
operation = job.operation operation = job.operation
server_id = job.server_id server_id = job.server_id
overlay_id_for_job = job.overlay_id
if server_id is not None: if server_id is not None:
server = db.scalar(select(Server).where(Server.id == server_id)) server = db.scalar(select(Server).where(Server.id == server_id))
if server is not None: if server is not None:
@ -133,6 +237,27 @@ def run_job(job_id: int) -> None:
on_stderr=on_stderr, on_stderr=on_stderr,
should_cancel=should_cancel, should_cancel=should_cancel,
) )
elif operation == "refresh_workshop_items":
_run_with_boundaries(
"refresh",
"workshop items",
_run_refresh_workshop_items,
on_stdout=on_stdout,
on_stderr=on_stderr,
should_cancel=should_cancel,
)
elif operation == "build_overlay":
if overlay_id_for_job is None:
raise ValueError("build_overlay job has no overlay_id")
_run_with_boundaries(
"build",
f"overlay {overlay_id_for_job}",
_run_build_overlay,
overlay_id_for_job,
on_stdout=on_stdout,
on_stderr=on_stderr,
should_cancel=should_cancel,
)
elif operation in SERVER_OPERATIONS and server_id is None: elif operation in SERVER_OPERATIONS and server_id is None:
raise ValueError(f"{operation} job has no server_id") raise ValueError(f"{operation} job has no server_id")
elif operation == "initialize": elif operation == "initialize":
@ -213,6 +338,153 @@ def run_job(job_id: int) -> None:
finish_job(job_id, "failed", 1, error=error) finish_job(job_id, "failed", 1, error=error)
def _run_build_overlay(
overlay_id: int,
*,
on_stdout: Callable[[str], None],
on_stderr: Callable[[str], None],
should_cancel: Callable[[], bool],
) -> None:
"""Dispatch a build_overlay job through the builder registry."""
from l4d2web.services.overlay_builders import BUILDERS
with session_scope() as db:
overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id))
if overlay is None:
raise ValueError(f"overlay {overlay_id} not found")
builder = BUILDERS.get(overlay.type)
if builder is None:
raise ValueError(f"no builder registered for overlay type {overlay.type!r}")
# Detach overlay before leaving the session so the builder can read its
# attributes without a stale connection.
db.expunge(overlay)
builder.build(
overlay,
on_stdout=on_stdout,
on_stderr=on_stderr,
should_cancel=should_cancel,
)
def _run_refresh_workshop_items(
*,
on_stdout: Callable[[str], None],
on_stderr: Callable[[str], None],
should_cancel: Callable[[], bool],
) -> list[int]:
"""Refresh metadata for every WorkshopItem, redownload changed items, and
enqueue (coalesced) build_overlay jobs for any overlay whose items had
`time_updated` advance or `filename` change. Returns the affected
overlay_ids for testability."""
from l4d2web.services import steam_workshop
from l4d2web.services.workshop_paths import workshop_cache_root
# Snapshot all WorkshopItems for the metadata batch.
with session_scope() as db:
items = db.scalars(select(WorkshopItem)).all()
snapshot = [
(it.id, it.steam_id, it.time_updated, it.filename) for it in items
]
if not snapshot:
on_stdout("no workshop items registered; nothing to refresh")
return []
steam_ids = [s for _, s, _, _ in snapshot]
on_stdout(f"fetching metadata for {len(steam_ids)} items")
metas = steam_workshop.fetch_metadata_batch(steam_ids, mode="refresh")
metas_by_id = {m.steam_id: m for m in metas}
on_stdout(f"metadata phase complete (received {len(metas)} entries)")
if should_cancel():
on_stderr("refresh cancelled after metadata phase")
return []
# Update DB rows + collect items that need (re)download.
affected_workshop_item_ids: set[int] = set()
download_metas: list[steam_workshop.WorkshopMetadata] = []
with session_scope() as db:
for item_id, steam_id, prior_time_updated, prior_filename in snapshot:
meta = metas_by_id.get(steam_id)
wi = db.scalar(select(WorkshopItem).where(WorkshopItem.id == item_id))
if wi is None:
continue
if meta is None:
# Steam dropped this item from the response.
wi.last_error = "steam returned no entry for this item"
continue
wi.title = meta.title
wi.filename = meta.filename
wi.file_url = meta.file_url
wi.file_size = meta.file_size
wi.preview_url = meta.preview_url
wi.last_error = "" if meta.result == 1 else f"steam result {meta.result}"
if meta.result != 1 or not meta.file_url:
continue
if (
meta.time_updated > prior_time_updated
or meta.filename != prior_filename
or wi.last_downloaded_at is None
):
wi.time_updated = meta.time_updated
affected_workshop_item_ids.add(item_id)
download_metas.append(meta)
on_stdout(f"downloading {len(download_metas)} items")
if download_metas:
report = steam_workshop.refresh_all(
download_metas, workshop_cache_root(), should_cancel=should_cancel
)
on_stdout(
f"download phase complete (downloaded={report.downloaded} errors={report.errors})"
)
if report.errors:
for steam_id, err in report.per_item_errors.items():
on_stderr(f"download {steam_id}: {err}")
# Mark successfully downloaded items.
with session_scope() as db:
for meta in download_metas:
if meta.steam_id in report.per_item_errors:
continue
wi = db.scalar(select(WorkshopItem).where(WorkshopItem.steam_id == meta.steam_id))
if wi is not None:
wi.last_downloaded_at = datetime.now(UTC)
# Enqueue (coalesced) build_overlay for affected overlays.
if not affected_workshop_item_ids:
on_stdout("no overlays needed rebuilding")
return []
with session_scope() as db:
overlay_rows = db.execute(
select(OverlayWorkshopItem.overlay_id)
.where(OverlayWorkshopItem.workshop_item_id.in_(affected_workshop_item_ids))
.distinct()
).all()
affected_overlay_ids = [row[0] for row in overlay_rows]
for ov_id in affected_overlay_ids:
# Find a sensible owner for the auto-enqueued job: the overlay's
# user_id if private, else any admin (best effort) — fall back to
# the most recent existing job's user_id.
overlay = db.scalar(select(Overlay).where(Overlay.id == ov_id))
if overlay is None:
continue
user_id = overlay.user_id
if user_id is None:
# System overlay — pick any admin user; fall back to first user.
user_id = db.scalar(
select(Job.user_id).order_by(Job.created_at.desc()).limit(1)
)
if user_id is None:
continue
enqueue_build_overlay(db, overlay_id=ov_id, user_id=user_id)
on_stdout(
f"enqueued build_overlay for {len(affected_overlay_ids)} overlay(s)"
)
return list(affected_overlay_ids)
def finish_job(job_id: int, state: str, exit_code: int | None, error: str = "") -> None: def finish_job(job_id: int, state: str, exit_code: int | None, error: str = "") -> None:
now = datetime.now(UTC) now = datetime.now(UTC)
with session_scope() as db: with session_scope() as db:

View file

@ -8,16 +8,33 @@ from sqlalchemy import select
from l4d2web.auth import hash_password from l4d2web.auth import hash_password
from l4d2web.db import init_db, session_scope from l4d2web.db import init_db, session_scope
from l4d2web.models import Blueprint, Job, Server, User from l4d2web.models import (
Blueprint,
BlueprintOverlay,
Job,
Overlay,
OverlayWorkshopItem,
Server,
User,
WorkshopItem,
)
from l4d2web.services import l4d2_facade from l4d2web.services import l4d2_facade
from l4d2web.services.host_commands import CommandCancelledError from l4d2web.services.host_commands import CommandCancelledError
from l4d2web.services.job_worker import SchedulerState, can_start, recover_stale_jobs, run_worker_once from l4d2web.services.job_worker import (
SchedulerState,
build_scheduler_state,
can_start,
enqueue_build_overlay,
recover_stale_jobs,
run_worker_once,
)
@dataclass @dataclass
class DummyJob: class DummyJob:
operation: str operation: str
server_id: int | None = None server_id: int | None = None
overlay_id: int | None = None
@pytest.fixture @pytest.fixture
@ -65,12 +82,14 @@ def add_job(
server_id: int | None, server_id: int | None,
state: str = "queued", state: str = "queued",
created_at: datetime | None = None, created_at: datetime | None = None,
overlay_id: int | None = None,
) -> int: ) -> int:
now = datetime.now(UTC) now = datetime.now(UTC)
with session_scope() as session: with session_scope() as session:
job = Job( job = Job(
user_id=user_id, user_id=user_id,
server_id=server_id, server_id=server_id,
overlay_id=overlay_id,
operation=operation, operation=operation,
state=state, state=state,
created_at=created_at or now, created_at=created_at or now,
@ -379,3 +398,192 @@ def test_worker_startup_when_enabled_outside_testing(monkeypatch, tmp_path) -> N
app = app_module.create_app({"TESTING": False, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) app = app_module.create_app({"TESTING": False, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
assert called == [app] assert called == [app]
# ---------------------------------------------------------------------------
# Scheduler truth table for the new operations (build_overlay,
# refresh_workshop_items) and their interaction with existing ops.
# ---------------------------------------------------------------------------
def test_install_blocks_build_overlay_and_refresh() -> None:
state = SchedulerState(install_running=True)
assert can_start(DummyJob(operation="build_overlay", overlay_id=1), state) is False
assert can_start(DummyJob(operation="refresh_workshop_items"), state) is False
def test_refresh_blocks_install_build_overlay_and_servers() -> None:
state = SchedulerState(refresh_running=True)
assert can_start(DummyJob(operation="install"), state) is False
assert can_start(DummyJob(operation="build_overlay", overlay_id=1), state) is False
assert can_start(DummyJob(operation="start", server_id=1), state) is False
def test_build_overlay_blocks_same_overlay_only() -> None:
state = SchedulerState()
state.running_overlays.add(7)
assert can_start(DummyJob(operation="build_overlay", overlay_id=7), state) is False
assert can_start(DummyJob(operation="build_overlay", overlay_id=8), state) is True
def test_install_blocked_by_active_build_overlay() -> None:
state = SchedulerState()
state.running_overlays.add(7)
assert can_start(DummyJob(operation="install"), state) is False
def test_refresh_blocked_by_active_build_overlay() -> None:
state = SchedulerState()
state.running_overlays.add(7)
assert can_start(DummyJob(operation="refresh_workshop_items"), state) is False
def test_server_job_blocked_when_blueprint_overlay_is_building() -> None:
state = SchedulerState()
state.running_overlays.add(7)
state.blocked_servers_by_overlay.add(42)
assert can_start(DummyJob(operation="start", server_id=42), state) is False
# Other servers (whose blueprints don't reference overlay 7) are NOT blocked.
assert can_start(DummyJob(operation="start", server_id=43), state) is True
@pytest.fixture
def overlay_seeded_worker(seeded_worker):
app, ids = seeded_worker
with session_scope() as s:
overlay = Overlay(name="ws", path="9", type="workshop", user_id=ids.user)
s.add(overlay)
s.flush()
# Move server_two onto a different blueprint with NO workshop overlay,
# so the test can distinguish "blocked by overlay build" from "any
# server is blocked".
bp_with_overlay = s.scalar(select(Blueprint).where(Blueprint.user_id == ids.user))
s.add(BlueprintOverlay(blueprint_id=bp_with_overlay.id, overlay_id=overlay.id, position=0))
bp_without = Blueprint(user_id=ids.user, name="no-overlay", arguments="[]", config="[]")
s.add(bp_without)
s.flush()
server_two = s.scalar(select(Server).where(Server.id == ids.server_two))
server_two.blueprint_id = bp_without.id
ids.overlay = overlay.id
return app, ids
def test_scheduler_state_finds_servers_blocked_by_running_build(overlay_seeded_worker) -> None:
app, ids = overlay_seeded_worker
add_job(ids.user, "build_overlay", server_id=None, state="running", overlay_id=ids.overlay)
with session_scope() as s:
state = build_scheduler_state(s)
assert ids.overlay in state.running_overlays
assert ids.server_one in state.blocked_servers_by_overlay
assert ids.server_two not in state.blocked_servers_by_overlay
def test_enqueue_build_overlay_creates_new_job_when_none_pending(overlay_seeded_worker) -> None:
app, ids = overlay_seeded_worker
with session_scope() as s:
job = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
assert job.operation == "build_overlay"
assert job.overlay_id == ids.overlay
assert job.server_id is None
assert job.state == "queued"
def test_enqueue_build_overlay_coalesces_against_pending(overlay_seeded_worker) -> None:
app, ids = overlay_seeded_worker
with session_scope() as s:
first = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
first_id = first.id
with session_scope() as s:
second = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
assert second.id == first_id, "should coalesce against the pending job"
with session_scope() as s:
n = s.query(Job).filter_by(operation="build_overlay", overlay_id=ids.overlay).count()
assert n == 1
def test_enqueue_build_overlay_does_not_coalesce_against_running(overlay_seeded_worker) -> None:
app, ids = overlay_seeded_worker
add_job(ids.user, "build_overlay", server_id=None, state="running", overlay_id=ids.overlay)
with session_scope() as s:
new_job = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
assert new_job.state == "queued"
with session_scope() as s:
running = s.scalars(
select(Job).where(
Job.operation == "build_overlay",
Job.overlay_id == ids.overlay,
Job.state == "running",
)
).all()
queued = s.scalars(
select(Job).where(
Job.operation == "build_overlay",
Job.overlay_id == ids.overlay,
Job.state == "queued",
)
).all()
assert len(running) == 1
assert len(queued) == 1
def test_run_worker_once_dispatches_build_overlay(overlay_seeded_worker, monkeypatch, tmp_path) -> None:
app, ids = overlay_seeded_worker
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
with session_scope() as s:
wi = WorkshopItem(steam_id="1001", title="A", filename="a.vpk", file_url="u", file_size=3, time_updated=1)
s.add(wi)
s.flush()
s.add(OverlayWorkshopItem(overlay_id=ids.overlay, workshop_item_id=wi.id))
cache = tmp_path / "workshop_cache"
cache.mkdir()
(cache / "1001.vpk").write_bytes(b"abc")
import os
os.utime(cache / "1001.vpk", (1, 1))
# Mark item as downloaded.
with session_scope() as s:
wi = s.query(WorkshopItem).filter_by(steam_id="1001").one()
wi.last_downloaded_at = datetime.now(UTC)
job_id = add_job(ids.user, "build_overlay", server_id=None, overlay_id=ids.overlay)
with app.app_context():
assert run_worker_once() is True
job = load_job(job_id)
assert job.state == "succeeded", job.exit_code
addons = tmp_path / "overlays" / "9" / "left4dead2" / "addons"
assert (addons / "1001.vpk").is_symlink()
def test_run_worker_once_dispatches_refresh(overlay_seeded_worker, monkeypatch, tmp_path) -> None:
app, ids = overlay_seeded_worker
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
from l4d2web.services import steam_workshop, job_worker
refresh_calls = []
def fake_refresh_workshop_items(*, on_stdout, on_stderr, should_cancel):
refresh_calls.append(True)
on_stdout("refresh phase complete (downloaded=0 errors=0)")
return [] # no overlays affected
monkeypatch.setattr(job_worker, "_run_refresh_workshop_items", fake_refresh_workshop_items)
job_id = add_job(ids.user, "refresh_workshop_items", server_id=None)
with app.app_context():
assert run_worker_once() is True
assert refresh_calls == [True]
job = load_job(job_id)
assert job.state == "succeeded"