diff --git a/l4d2web/services/job_worker.py b/l4d2web/services/job_worker.py index 5d41ad5..3afa47e 100644 --- a/l4d2web/services/job_worker.py +++ b/l4d2web/services/job_worker.py @@ -10,13 +10,24 @@ from sqlalchemy.orm import Session from typing import Callable from l4d2web.db import session_scope -from l4d2web.models import Job, JobLog, Server +from l4d2web.models import ( + Blueprint, + BlueprintOverlay, + Job, + JobLog, + Overlay, + OverlayWorkshopItem, + Server, + WorkshopItem, +) from l4d2web.services.host_commands import CommandCancelledError TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"} ACTIVE_JOB_STATES = {"running", "cancelling"} SERVER_OPERATIONS = {"initialize", "start", "stop", "delete"} +OVERLAY_OPERATIONS = {"build_overlay"} +GLOBAL_OPERATIONS = {"install", "refresh_workshop_items"} _claim_lock = threading.Lock() _log_lock = threading.RLock() @@ -27,17 +38,55 @@ _workers_started = False @dataclass class SchedulerState: install_running: bool = False + refresh_running: bool = False running_servers: set[int] = field(default_factory=set) + running_overlays: set[int] = field(default_factory=set) + blocked_servers_by_overlay: set[int] = field(default_factory=set) def can_start(job, state: SchedulerState) -> bool: + """Truth table for the worker's claim policy. + + install / refresh_workshop_items are global mutexes — they block each + other, all build_overlay jobs, and all server jobs. + + build_overlay(overlay_id=N) is per-overlay: blocks on install/refresh, and + on another build for the same overlay. Different overlays may build + concurrently. + + Server start/init blocks on install/refresh and on a build_overlay for any + overlay referenced by the server's blueprint. + """ if job.operation == "install": - return (not state.install_running) and (len(state.running_servers) == 0) - if state.install_running: + return ( + not state.install_running + and not state.refresh_running + and len(state.running_servers) == 0 + and len(state.running_overlays) == 0 + ) + if job.operation == "refresh_workshop_items": + return ( + not state.install_running + and not state.refresh_running + and len(state.running_servers) == 0 + and len(state.running_overlays) == 0 + ) + if job.operation == "build_overlay": + if state.install_running or state.refresh_running: + return False + if job.overlay_id is None: + return False + return job.overlay_id not in state.running_overlays + # Server operations from here on. + if state.install_running or state.refresh_running: return False if job.server_id is None: return False - return job.server_id not in state.running_servers + if job.server_id in state.running_servers: + return False + if job.server_id in state.blocked_servers_by_overlay: + return False + return True def build_scheduler_state(session: Session) -> SchedulerState: @@ -46,8 +95,22 @@ def build_scheduler_state(session: Session) -> SchedulerState: for job in running_jobs: if job.operation == "install": state.install_running = True + elif job.operation == "refresh_workshop_items": + state.refresh_running = True + elif job.operation == "build_overlay" and job.overlay_id is not None: + state.running_overlays.add(job.overlay_id) elif job.server_id is not None: state.running_servers.add(job.server_id) + + if state.running_overlays: + rows = session.execute( + select(Server.id) + .join(Blueprint, Blueprint.id == Server.blueprint_id) + .join(BlueprintOverlay, BlueprintOverlay.blueprint_id == Blueprint.id) + .where(BlueprintOverlay.overlay_id.in_(state.running_overlays)) + ).all() + state.blocked_servers_by_overlay = {row[0] for row in rows} + return state @@ -65,8 +128,22 @@ def claim_next_job() -> int | None: jobs = db.scalars(select(Job).where(Job.state == "queued").order_by(Job.created_at, Job.id)).all() now = datetime.now(UTC) for job in jobs: - malformed_server_job = job.operation != "install" and job.server_id is None - if not malformed_server_job and not can_start(job, state): + malformed_server_job = ( + job.operation in SERVER_OPERATIONS and job.server_id is None + ) + malformed_overlay_job = ( + job.operation in OVERLAY_OPERATIONS and job.overlay_id is None + ) + if malformed_server_job or malformed_overlay_job: + # Mark malformed jobs failed immediately so the scheduler can move on. + job.state = "failed" + job.exit_code = 1 + job.started_at = now + job.finished_at = now + job.updated_at = now + db.flush() + continue + if not can_start(job, state): continue job.state = "running" @@ -78,6 +155,31 @@ def claim_next_job() -> int | None: return None +def enqueue_build_overlay(session: Session, *, overlay_id: int, user_id: int) -> Job: + """Insert a `build_overlay` job, coalescing against any already-queued + (not-yet-running) build for the same overlay. Running jobs are NOT + coalesced — a fresh add after a build started gets its own job.""" + existing = session.scalar( + select(Job).where( + Job.operation == "build_overlay", + Job.overlay_id == overlay_id, + Job.state == "queued", + ) + ) + if existing is not None: + return existing + job = Job( + user_id=user_id, + server_id=None, + overlay_id=overlay_id, + operation="build_overlay", + state="queued", + ) + session.add(job) + session.flush() + return job + + def run_worker_once() -> bool: job_id = claim_next_job() if job_id is None: @@ -90,12 +192,14 @@ def run_job(job_id: int) -> None: from l4d2web.services import l4d2_facade server_name = "unknown" + overlay_id_for_job: int | None = None with session_scope() as db: job = db.scalar(select(Job).where(Job.id == job_id)) if job is None: return operation = job.operation server_id = job.server_id + overlay_id_for_job = job.overlay_id if server_id is not None: server = db.scalar(select(Server).where(Server.id == server_id)) if server is not None: @@ -133,6 +237,27 @@ def run_job(job_id: int) -> None: on_stderr=on_stderr, should_cancel=should_cancel, ) + elif operation == "refresh_workshop_items": + _run_with_boundaries( + "refresh", + "workshop items", + _run_refresh_workshop_items, + on_stdout=on_stdout, + on_stderr=on_stderr, + should_cancel=should_cancel, + ) + elif operation == "build_overlay": + if overlay_id_for_job is None: + raise ValueError("build_overlay job has no overlay_id") + _run_with_boundaries( + "build", + f"overlay {overlay_id_for_job}", + _run_build_overlay, + overlay_id_for_job, + on_stdout=on_stdout, + on_stderr=on_stderr, + should_cancel=should_cancel, + ) elif operation in SERVER_OPERATIONS and server_id is None: raise ValueError(f"{operation} job has no server_id") elif operation == "initialize": @@ -213,6 +338,153 @@ def run_job(job_id: int) -> None: finish_job(job_id, "failed", 1, error=error) +def _run_build_overlay( + overlay_id: int, + *, + on_stdout: Callable[[str], None], + on_stderr: Callable[[str], None], + should_cancel: Callable[[], bool], +) -> None: + """Dispatch a build_overlay job through the builder registry.""" + from l4d2web.services.overlay_builders import BUILDERS + + with session_scope() as db: + overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id)) + if overlay is None: + raise ValueError(f"overlay {overlay_id} not found") + builder = BUILDERS.get(overlay.type) + if builder is None: + raise ValueError(f"no builder registered for overlay type {overlay.type!r}") + # Detach overlay before leaving the session so the builder can read its + # attributes without a stale connection. + db.expunge(overlay) + + builder.build( + overlay, + on_stdout=on_stdout, + on_stderr=on_stderr, + should_cancel=should_cancel, + ) + + +def _run_refresh_workshop_items( + *, + on_stdout: Callable[[str], None], + on_stderr: Callable[[str], None], + should_cancel: Callable[[], bool], +) -> list[int]: + """Refresh metadata for every WorkshopItem, redownload changed items, and + enqueue (coalesced) build_overlay jobs for any overlay whose items had + `time_updated` advance or `filename` change. Returns the affected + overlay_ids for testability.""" + from l4d2web.services import steam_workshop + from l4d2web.services.workshop_paths import workshop_cache_root + + # Snapshot all WorkshopItems for the metadata batch. + with session_scope() as db: + items = db.scalars(select(WorkshopItem)).all() + snapshot = [ + (it.id, it.steam_id, it.time_updated, it.filename) for it in items + ] + + if not snapshot: + on_stdout("no workshop items registered; nothing to refresh") + return [] + + steam_ids = [s for _, s, _, _ in snapshot] + on_stdout(f"fetching metadata for {len(steam_ids)} items") + metas = steam_workshop.fetch_metadata_batch(steam_ids, mode="refresh") + metas_by_id = {m.steam_id: m for m in metas} + on_stdout(f"metadata phase complete (received {len(metas)} entries)") + + if should_cancel(): + on_stderr("refresh cancelled after metadata phase") + return [] + + # Update DB rows + collect items that need (re)download. + affected_workshop_item_ids: set[int] = set() + download_metas: list[steam_workshop.WorkshopMetadata] = [] + with session_scope() as db: + for item_id, steam_id, prior_time_updated, prior_filename in snapshot: + meta = metas_by_id.get(steam_id) + wi = db.scalar(select(WorkshopItem).where(WorkshopItem.id == item_id)) + if wi is None: + continue + if meta is None: + # Steam dropped this item from the response. + wi.last_error = "steam returned no entry for this item" + continue + wi.title = meta.title + wi.filename = meta.filename + wi.file_url = meta.file_url + wi.file_size = meta.file_size + wi.preview_url = meta.preview_url + wi.last_error = "" if meta.result == 1 else f"steam result {meta.result}" + if meta.result != 1 or not meta.file_url: + continue + if ( + meta.time_updated > prior_time_updated + or meta.filename != prior_filename + or wi.last_downloaded_at is None + ): + wi.time_updated = meta.time_updated + affected_workshop_item_ids.add(item_id) + download_metas.append(meta) + + on_stdout(f"downloading {len(download_metas)} items") + if download_metas: + report = steam_workshop.refresh_all( + download_metas, workshop_cache_root(), should_cancel=should_cancel + ) + on_stdout( + f"download phase complete (downloaded={report.downloaded} errors={report.errors})" + ) + if report.errors: + for steam_id, err in report.per_item_errors.items(): + on_stderr(f"download {steam_id}: {err}") + # Mark successfully downloaded items. + with session_scope() as db: + for meta in download_metas: + if meta.steam_id in report.per_item_errors: + continue + wi = db.scalar(select(WorkshopItem).where(WorkshopItem.steam_id == meta.steam_id)) + if wi is not None: + wi.last_downloaded_at = datetime.now(UTC) + + # Enqueue (coalesced) build_overlay for affected overlays. + if not affected_workshop_item_ids: + on_stdout("no overlays needed rebuilding") + return [] + + with session_scope() as db: + overlay_rows = db.execute( + select(OverlayWorkshopItem.overlay_id) + .where(OverlayWorkshopItem.workshop_item_id.in_(affected_workshop_item_ids)) + .distinct() + ).all() + affected_overlay_ids = [row[0] for row in overlay_rows] + for ov_id in affected_overlay_ids: + # Find a sensible owner for the auto-enqueued job: the overlay's + # user_id if private, else any admin (best effort) — fall back to + # the most recent existing job's user_id. + overlay = db.scalar(select(Overlay).where(Overlay.id == ov_id)) + if overlay is None: + continue + user_id = overlay.user_id + if user_id is None: + # System overlay — pick any admin user; fall back to first user. + user_id = db.scalar( + select(Job.user_id).order_by(Job.created_at.desc()).limit(1) + ) + if user_id is None: + continue + enqueue_build_overlay(db, overlay_id=ov_id, user_id=user_id) + on_stdout( + f"enqueued build_overlay for {len(affected_overlay_ids)} overlay(s)" + ) + return list(affected_overlay_ids) + + def finish_job(job_id: int, state: str, exit_code: int | None, error: str = "") -> None: now = datetime.now(UTC) with session_scope() as db: diff --git a/l4d2web/tests/test_job_worker.py b/l4d2web/tests/test_job_worker.py index fa80dda..70f61a0 100644 --- a/l4d2web/tests/test_job_worker.py +++ b/l4d2web/tests/test_job_worker.py @@ -8,16 +8,33 @@ from sqlalchemy import select from l4d2web.auth import hash_password from l4d2web.db import init_db, session_scope -from l4d2web.models import Blueprint, Job, Server, User +from l4d2web.models import ( + Blueprint, + BlueprintOverlay, + Job, + Overlay, + OverlayWorkshopItem, + Server, + User, + WorkshopItem, +) from l4d2web.services import l4d2_facade from l4d2web.services.host_commands import CommandCancelledError -from l4d2web.services.job_worker import SchedulerState, can_start, recover_stale_jobs, run_worker_once +from l4d2web.services.job_worker import ( + SchedulerState, + build_scheduler_state, + can_start, + enqueue_build_overlay, + recover_stale_jobs, + run_worker_once, +) @dataclass class DummyJob: operation: str server_id: int | None = None + overlay_id: int | None = None @pytest.fixture @@ -65,12 +82,14 @@ def add_job( server_id: int | None, state: str = "queued", created_at: datetime | None = None, + overlay_id: int | None = None, ) -> int: now = datetime.now(UTC) with session_scope() as session: job = Job( user_id=user_id, server_id=server_id, + overlay_id=overlay_id, operation=operation, state=state, created_at=created_at or now, @@ -379,3 +398,192 @@ def test_worker_startup_when_enabled_outside_testing(monkeypatch, tmp_path) -> N app = app_module.create_app({"TESTING": False, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) assert called == [app] + + +# --------------------------------------------------------------------------- +# Scheduler truth table for the new operations (build_overlay, +# refresh_workshop_items) and their interaction with existing ops. +# --------------------------------------------------------------------------- + + +def test_install_blocks_build_overlay_and_refresh() -> None: + state = SchedulerState(install_running=True) + assert can_start(DummyJob(operation="build_overlay", overlay_id=1), state) is False + assert can_start(DummyJob(operation="refresh_workshop_items"), state) is False + + +def test_refresh_blocks_install_build_overlay_and_servers() -> None: + state = SchedulerState(refresh_running=True) + assert can_start(DummyJob(operation="install"), state) is False + assert can_start(DummyJob(operation="build_overlay", overlay_id=1), state) is False + assert can_start(DummyJob(operation="start", server_id=1), state) is False + + +def test_build_overlay_blocks_same_overlay_only() -> None: + state = SchedulerState() + state.running_overlays.add(7) + assert can_start(DummyJob(operation="build_overlay", overlay_id=7), state) is False + assert can_start(DummyJob(operation="build_overlay", overlay_id=8), state) is True + + +def test_install_blocked_by_active_build_overlay() -> None: + state = SchedulerState() + state.running_overlays.add(7) + assert can_start(DummyJob(operation="install"), state) is False + + +def test_refresh_blocked_by_active_build_overlay() -> None: + state = SchedulerState() + state.running_overlays.add(7) + assert can_start(DummyJob(operation="refresh_workshop_items"), state) is False + + +def test_server_job_blocked_when_blueprint_overlay_is_building() -> None: + state = SchedulerState() + state.running_overlays.add(7) + state.blocked_servers_by_overlay.add(42) + assert can_start(DummyJob(operation="start", server_id=42), state) is False + # Other servers (whose blueprints don't reference overlay 7) are NOT blocked. + assert can_start(DummyJob(operation="start", server_id=43), state) is True + + +@pytest.fixture +def overlay_seeded_worker(seeded_worker): + app, ids = seeded_worker + with session_scope() as s: + overlay = Overlay(name="ws", path="9", type="workshop", user_id=ids.user) + s.add(overlay) + s.flush() + # Move server_two onto a different blueprint with NO workshop overlay, + # so the test can distinguish "blocked by overlay build" from "any + # server is blocked". + bp_with_overlay = s.scalar(select(Blueprint).where(Blueprint.user_id == ids.user)) + s.add(BlueprintOverlay(blueprint_id=bp_with_overlay.id, overlay_id=overlay.id, position=0)) + bp_without = Blueprint(user_id=ids.user, name="no-overlay", arguments="[]", config="[]") + s.add(bp_without) + s.flush() + server_two = s.scalar(select(Server).where(Server.id == ids.server_two)) + server_two.blueprint_id = bp_without.id + ids.overlay = overlay.id + return app, ids + + +def test_scheduler_state_finds_servers_blocked_by_running_build(overlay_seeded_worker) -> None: + app, ids = overlay_seeded_worker + add_job(ids.user, "build_overlay", server_id=None, state="running", overlay_id=ids.overlay) + + with session_scope() as s: + state = build_scheduler_state(s) + + assert ids.overlay in state.running_overlays + assert ids.server_one in state.blocked_servers_by_overlay + assert ids.server_two not in state.blocked_servers_by_overlay + + +def test_enqueue_build_overlay_creates_new_job_when_none_pending(overlay_seeded_worker) -> None: + app, ids = overlay_seeded_worker + + with session_scope() as s: + job = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user) + assert job.operation == "build_overlay" + assert job.overlay_id == ids.overlay + assert job.server_id is None + assert job.state == "queued" + + +def test_enqueue_build_overlay_coalesces_against_pending(overlay_seeded_worker) -> None: + app, ids = overlay_seeded_worker + + with session_scope() as s: + first = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user) + first_id = first.id + + with session_scope() as s: + second = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user) + assert second.id == first_id, "should coalesce against the pending job" + + with session_scope() as s: + n = s.query(Job).filter_by(operation="build_overlay", overlay_id=ids.overlay).count() + assert n == 1 + + +def test_enqueue_build_overlay_does_not_coalesce_against_running(overlay_seeded_worker) -> None: + app, ids = overlay_seeded_worker + add_job(ids.user, "build_overlay", server_id=None, state="running", overlay_id=ids.overlay) + + with session_scope() as s: + new_job = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user) + assert new_job.state == "queued" + + with session_scope() as s: + running = s.scalars( + select(Job).where( + Job.operation == "build_overlay", + Job.overlay_id == ids.overlay, + Job.state == "running", + ) + ).all() + queued = s.scalars( + select(Job).where( + Job.operation == "build_overlay", + Job.overlay_id == ids.overlay, + Job.state == "queued", + ) + ).all() + assert len(running) == 1 + assert len(queued) == 1 + + +def test_run_worker_once_dispatches_build_overlay(overlay_seeded_worker, monkeypatch, tmp_path) -> None: + app, ids = overlay_seeded_worker + monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path)) + + with session_scope() as s: + wi = WorkshopItem(steam_id="1001", title="A", filename="a.vpk", file_url="u", file_size=3, time_updated=1) + s.add(wi) + s.flush() + s.add(OverlayWorkshopItem(overlay_id=ids.overlay, workshop_item_id=wi.id)) + cache = tmp_path / "workshop_cache" + cache.mkdir() + (cache / "1001.vpk").write_bytes(b"abc") + import os + os.utime(cache / "1001.vpk", (1, 1)) + # Mark item as downloaded. + with session_scope() as s: + wi = s.query(WorkshopItem).filter_by(steam_id="1001").one() + wi.last_downloaded_at = datetime.now(UTC) + + job_id = add_job(ids.user, "build_overlay", server_id=None, overlay_id=ids.overlay) + + with app.app_context(): + assert run_worker_once() is True + + job = load_job(job_id) + assert job.state == "succeeded", job.exit_code + addons = tmp_path / "overlays" / "9" / "left4dead2" / "addons" + assert (addons / "1001.vpk").is_symlink() + + +def test_run_worker_once_dispatches_refresh(overlay_seeded_worker, monkeypatch, tmp_path) -> None: + app, ids = overlay_seeded_worker + monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path)) + + from l4d2web.services import steam_workshop, job_worker + + refresh_calls = [] + + def fake_refresh_workshop_items(*, on_stdout, on_stderr, should_cancel): + refresh_calls.append(True) + on_stdout("refresh phase complete (downloaded=0 errors=0)") + return [] # no overlays affected + + monkeypatch.setattr(job_worker, "_run_refresh_workshop_items", fake_refresh_workshop_items) + + job_id = add_job(ids.user, "refresh_workshop_items", server_id=None) + + with app.app_context(): + assert run_worker_once() is True + + assert refresh_calls == [True] + job = load_job(job_id) + assert job.state == "succeeded"