from dataclasses import dataclass, field from datetime import UTC, datetime import subprocess import threading import time from sqlalchemy import func, select from sqlalchemy.orm import Session from typing import Callable from l4d2web.db import session_scope from l4d2web.models import ( Blueprint, BlueprintOverlay, Job, JobLog, Overlay, OverlayWorkshopItem, Server, WorkshopItem, ) from l4d2web.services.host_commands import CommandCancelledError TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"} ACTIVE_JOB_STATES = {"running", "cancelling"} SERVER_OPERATIONS = {"initialize", "start", "stop", "delete", "reset"} OVERLAY_OPERATIONS = {"build_overlay"} GLOBAL_OPERATIONS = {"install", "refresh_workshop_items"} WORKSHOP_REFRESH_DOWNLOAD_WORKERS = 1 _claim_lock = threading.Lock() _log_lock = threading.RLock() _worker_start_lock = threading.Lock() _workers_started = False @dataclass class SchedulerState: install_running: bool = False refresh_running: bool = False running_servers: set[int] = field(default_factory=set) running_overlays: set[int] = field(default_factory=set) blocked_servers_by_overlay: set[int] = field(default_factory=set) def can_start(job, state: SchedulerState) -> bool: """Truth table for the worker's claim policy. install / refresh_workshop_items are global mutexes — they block each other, all build_overlay jobs, and all server jobs. build_overlay(overlay_id=N) is per-overlay: blocks on install/refresh, and on another build for the same overlay. Different overlays may build concurrently. Server start/init blocks on install/refresh and on a build_overlay for any overlay referenced by the server's blueprint. """ if job.operation == "install": return ( not state.install_running and not state.refresh_running and len(state.running_servers) == 0 and len(state.running_overlays) == 0 ) if job.operation == "refresh_workshop_items": return ( not state.install_running and not state.refresh_running and len(state.running_servers) == 0 and len(state.running_overlays) == 0 ) if job.operation == "build_overlay": if state.install_running or state.refresh_running: return False if job.overlay_id is None: return False return job.overlay_id not in state.running_overlays # Server operations from here on. if state.install_running or state.refresh_running: return False if job.server_id is None: return False if job.server_id in state.running_servers: return False if job.server_id in state.blocked_servers_by_overlay: return False return True def build_scheduler_state(session: Session) -> SchedulerState: state = SchedulerState() running_jobs = session.scalars(select(Job).where(Job.state.in_(ACTIVE_JOB_STATES))).all() for job in running_jobs: if job.operation == "install": state.install_running = True elif job.operation == "refresh_workshop_items": state.refresh_running = True elif job.operation == "build_overlay" and job.overlay_id is not None: state.running_overlays.add(job.overlay_id) elif job.server_id is not None: state.running_servers.add(job.server_id) if state.running_overlays: rows = session.execute( select(Server.id) .join(Blueprint, Blueprint.id == Server.blueprint_id) .join(BlueprintOverlay, BlueprintOverlay.blueprint_id == Blueprint.id) .where(BlueprintOverlay.overlay_id.in_(state.running_overlays)) ).all() state.blocked_servers_by_overlay = {row[0] for row in rows} return state def claim_next_job() -> int | None: """Atomically claim the next runnable job. `_claim_lock` is process-local, so correctness depends on a single web process. The deployed unit pins `gunicorn --workers 1`; if that ever changes, replace this with a row-level lock (`with_for_update`) to keep job claims unique across processes. """ with _claim_lock: with session_scope() as db: state = build_scheduler_state(db) jobs = db.scalars(select(Job).where(Job.state == "queued").order_by(Job.created_at, Job.id)).all() now = datetime.now(UTC) for job in jobs: malformed_server_job = ( job.operation in SERVER_OPERATIONS and job.server_id is None ) malformed_overlay_job = ( job.operation in OVERLAY_OPERATIONS and job.overlay_id is None ) if malformed_server_job or malformed_overlay_job: # Mark malformed jobs failed immediately so the scheduler can move on. job.state = "failed" job.exit_code = 1 job.started_at = now job.finished_at = now job.updated_at = now db.flush() continue if not can_start(job, state): continue job.state = "running" job.started_at = now job.updated_at = now job.exit_code = None db.flush() return job.id return None def enqueue_build_overlay(session: Session, *, overlay_id: int, user_id: int) -> Job: """Insert a `build_overlay` job, coalescing against any already-queued (not-yet-running) build for the same overlay. Running jobs are NOT coalesced — a fresh add after a build started gets its own job.""" existing = session.scalar( select(Job).where( Job.operation == "build_overlay", Job.overlay_id == overlay_id, Job.state == "queued", ) ) if existing is not None: return existing job = Job( user_id=user_id, server_id=None, overlay_id=overlay_id, operation="build_overlay", state="queued", ) session.add(job) session.flush() return job def run_worker_once() -> bool: job_id = claim_next_job() if job_id is None: return False run_job(job_id) return True def run_job(job_id: int) -> None: from l4d2web.services import l4d2_facade server_name = "unknown" overlay_id_for_job: int | None = None with session_scope() as db: job = db.scalar(select(Job).where(Job.id == job_id)) if job is None: return operation = job.operation server_id = job.server_id overlay_id_for_job = job.overlay_id if server_id is not None: server = db.scalar(select(Server).where(Server.id == server_id)) if server is not None: server_name = server.name max_chars = 4096 def on_stdout(line: str) -> None: append_job_log_line(job_id, "stdout", line, max_chars=max_chars) def on_stderr(line: str) -> None: append_job_log_line(job_id, "stderr", line, max_chars=max_chars) def should_cancel() -> bool: with session_scope() as db: state = db.scalar(select(Job.state).where(Job.id == job_id)) return state == "cancelling" def raise_if_cancelled() -> None: if should_cancel(): raise CommandCancelledError(returncode=1, cmd=[operation], output="", stderr="") def _run_with_boundaries(action: str, target: str, func: Callable, *args, **kwargs): append_job_log_line(job_id, "stdout", f"starting {action} for {target}") func(*args, **kwargs) append_job_log_line(job_id, "stdout", f"finished {action} successfully") try: if operation == "install": _run_with_boundaries( "install", "server", l4d2_facade.install_runtime, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel, ) elif operation == "refresh_workshop_items": _run_with_boundaries( "refresh", "workshop items", _run_refresh_workshop_items, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel, ) elif operation == "build_overlay": if overlay_id_for_job is None: raise ValueError("build_overlay job has no overlay_id") _run_with_boundaries( "build", f"overlay {overlay_id_for_job}", _run_build_overlay, overlay_id_for_job, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel, ) elif operation in SERVER_OPERATIONS and server_id is None: raise ValueError(f"{operation} job has no server_id") elif operation == "initialize": _run_with_boundaries( "initialize", server_name, l4d2_facade.initialize_server, server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel, ) elif operation == "start": _run_with_boundaries( "initialize", server_name, l4d2_facade.initialize_server, server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel, ) raise_if_cancelled() _run_with_boundaries( "start", server_name, l4d2_facade.start_server, server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel, ) elif operation == "stop": _run_with_boundaries( "stop", server_name, l4d2_facade.stop_server, server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel, ) elif operation == "delete": _run_with_boundaries( "delete", server_name, l4d2_facade.delete_server, server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel, ) # Host-side cleanup succeeded; remove the DB row so the server # disappears from /servers. Status refresh is skipped — the # systemd unit is gone and querying it would just log noise. with session_scope() as db: server = db.scalar(select(Server).where(Server.id == server_id)) if server is not None: db.delete(server) finish_job(job_id, "succeeded", 0) return elif operation == "reset": _run_with_boundaries( "reset", server_name, l4d2_facade.reset_server, server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel, ) else: raise ValueError(f"unknown job operation: {operation}") if server_id is not None: refresh_server_actual_state_after_job(job_id, server_id) finish_job(job_id, "succeeded", 0) except CommandCancelledError as exc: error = "job cancelled; runtime state may be partial" append_job_log_line(job_id, "stderr", error, max_chars=max_chars) if server_id is not None: refresh_server_actual_state_after_job(job_id, server_id) exit_code = exc.returncode if exc.returncode is not None else 1 finish_job(job_id, "cancelled", exit_code, error=error) except subprocess.CalledProcessError as exc: error = exc.stderr or str(exc) if exc.stderr: append_job_log_line(job_id, "stderr", str(exc.stderr), max_chars=max_chars) if server_id is not None: refresh_server_actual_state_after_job(job_id, server_id) finish_job(job_id, "failed", exc.returncode, error=error) except Exception as exc: error = str(exc) append_job_log_line(job_id, "stderr", error, max_chars=max_chars) if server_id is not None: refresh_server_actual_state_after_job(job_id, server_id) finish_job(job_id, "failed", 1, error=error) def _run_build_overlay( overlay_id: int, *, on_stdout: Callable[[str], None], on_stderr: Callable[[str], None], should_cancel: Callable[[], bool], ) -> None: """Dispatch a build_overlay job through the builder registry.""" from l4d2web.services.overlay_builders import BUILDERS with session_scope() as db: overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id)) if overlay is None: raise ValueError(f"overlay {overlay_id} not found") builder = BUILDERS.get(overlay.type) if builder is None: raise ValueError(f"no builder registered for overlay type {overlay.type!r}") # Detach overlay before leaving the session so the builder can read its # attributes without a stale connection. db.expunge(overlay) builder.build( overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel, ) def _run_refresh_workshop_items( *, on_stdout: Callable[[str], None], on_stderr: Callable[[str], None], should_cancel: Callable[[], bool], ) -> list[int]: """Refresh metadata for every WorkshopItem, redownload changed items, and enqueue (coalesced) build_overlay jobs for any overlay whose items had `time_updated` advance or `filename` change. Returns the affected overlay_ids for testability.""" from l4d2web.services import steam_workshop from l4d2web.services.workshop_paths import workshop_cache_root # Snapshot all WorkshopItems for the metadata batch. with session_scope() as db: items = db.scalars(select(WorkshopItem)).all() snapshot = [ (it.id, it.steam_id, it.time_updated, it.filename) for it in items ] if not snapshot: on_stdout("no workshop items registered; nothing to refresh") return [] steam_ids = [s for _, s, _, _ in snapshot] on_stdout(f"fetching metadata for {len(steam_ids)} items") metas = steam_workshop.fetch_metadata_batch(steam_ids, mode="refresh") metas_by_id = {m.steam_id: m for m in metas} on_stdout(f"metadata phase complete (received {len(metas)} entries)") if should_cancel(): on_stderr("refresh cancelled after metadata phase") return [] # Update DB rows + collect items that need (re)download. affected_workshop_item_ids: set[int] = set() download_metas: list[steam_workshop.WorkshopMetadata] = [] with session_scope() as db: for item_id, steam_id, prior_time_updated, prior_filename in snapshot: meta = metas_by_id.get(steam_id) wi = db.scalar(select(WorkshopItem).where(WorkshopItem.id == item_id)) if wi is None: continue if meta is None: # Steam dropped this item from the response. wi.last_error = "steam returned no entry for this item" continue wi.title = meta.title wi.filename = meta.filename wi.file_url = meta.file_url wi.file_size = meta.file_size wi.preview_url = meta.preview_url wi.last_error = "" if meta.result == 1 else f"steam result {meta.result}" if meta.result != 1 or not meta.file_url: continue if ( meta.time_updated > prior_time_updated or meta.filename != prior_filename or wi.last_downloaded_at is None ): wi.time_updated = meta.time_updated affected_workshop_item_ids.add(item_id) download_metas.append(meta) on_stdout(f"downloading {len(download_metas)} items") if download_metas: report = steam_workshop.refresh_all( download_metas, workshop_cache_root(), executor_workers=WORKSHOP_REFRESH_DOWNLOAD_WORKERS, should_cancel=should_cancel, ) on_stdout( f"download phase complete (downloaded={report.downloaded} errors={report.errors})" ) if report.errors: for steam_id, err in report.per_item_errors.items(): on_stderr(f"download {steam_id}: {err}") # Mark successfully downloaded items. with session_scope() as db: for meta in download_metas: if meta.steam_id in report.per_item_errors: continue wi = db.scalar(select(WorkshopItem).where(WorkshopItem.steam_id == meta.steam_id)) if wi is not None: wi.last_downloaded_at = datetime.now(UTC) # Enqueue (coalesced) build_overlay for affected overlays. if not affected_workshop_item_ids: on_stdout("no overlays needed rebuilding") return [] with session_scope() as db: overlay_rows = db.execute( select(OverlayWorkshopItem.overlay_id) .where(OverlayWorkshopItem.workshop_item_id.in_(affected_workshop_item_ids)) .distinct() ).all() affected_overlay_ids = [row[0] for row in overlay_rows] for ov_id in affected_overlay_ids: # Find a sensible owner for the auto-enqueued job: the overlay's # user_id if private, else any admin (best effort) — fall back to # the most recent existing job's user_id. overlay = db.scalar(select(Overlay).where(Overlay.id == ov_id)) if overlay is None: continue user_id = overlay.user_id if user_id is None: # System overlay — pick any admin user; fall back to first user. user_id = db.scalar( select(Job.user_id).order_by(Job.created_at.desc()).limit(1) ) if user_id is None: continue enqueue_build_overlay(db, overlay_id=ov_id, user_id=user_id) on_stdout(f"enqueued build_overlay for {len(affected_overlay_ids)} overlay(s)") return list(affected_overlay_ids) def finish_job(job_id: int, state: str, exit_code: int | None, error: str = "") -> None: now = datetime.now(UTC) with session_scope() as db: job = db.scalar(select(Job).where(Job.id == job_id)) if job is None: return job.state = state job.exit_code = exit_code job.finished_at = now job.updated_at = now if job.server_id is not None: server = db.scalar(select(Server).where(Server.id == job.server_id)) if server is not None: server.last_error = "" if state == "succeeded" else error server.updated_at = now if job.operation == "build_overlay" and job.overlay_id is not None: overlay = db.scalar(select(Overlay).where(Overlay.id == job.overlay_id)) if overlay is not None: overlay.last_build_status = "ok" if state == "succeeded" else "failed" overlay.updated_at = now def append_job_log_line(job_id: int, stream: str, line: str, max_chars: int = 4096) -> int: with _log_lock: with session_scope() as db: return append_job_log(db, job_id, stream, line, max_chars=max_chars) def recover_stale_jobs() -> int: now = datetime.now(UTC) with session_scope() as db: jobs = db.scalars(select(Job).where(Job.state.in_(ACTIVE_JOB_STATES))).all() for job in jobs: job.state = "cancelled" if job.state == "cancelling" else "failed" job.exit_code = 1 job.finished_at = now job.updated_at = now return len(jobs) def append_job_log( session: Session, job_id: int, stream: str, line: str, max_chars: int = 4096, ) -> int: with _log_lock: last_seq = session.scalar(select(func.max(JobLog.seq)).where(JobLog.job_id == job_id)) or 0 next_seq = int(last_seq) + 1 session.add(JobLog(job_id=job_id, seq=next_seq, stream=stream, line=line[:max_chars])) session.flush() return next_seq def refresh_server_actual_state(server_id: int) -> str: from l4d2web.services import l4d2_facade now = datetime.now(UTC) with session_scope() as db: server = db.scalar(select(Server).where(Server.id == server_id)) if server is None: return "unknown" status = l4d2_facade.server_status(server.name) server.actual_state = status.state server.actual_state_updated_at = now server.updated_at = now return server.actual_state def refresh_server_actual_state_after_job(job_id: int, server_id: int) -> None: try: refresh_server_actual_state(server_id) except Exception as exc: append_job_log_line(job_id, "stderr", f"status refresh failed: {exc}") def start_job_workers(app) -> None: global _workers_started with _worker_start_lock: if _workers_started: return _workers_started = True threads = int(app.config.get("JOB_WORKER_THREADS", 4)) poll_seconds = float(app.config.get("JOB_WORKER_POLL_SECONDS", 1)) for index in range(threads): thread = threading.Thread( target=worker_loop, args=(app, poll_seconds), name=f"l4d2-job-worker-{index + 1}", daemon=True, ) thread.start() def worker_loop(app, poll_seconds: float) -> None: while True: ran_job = False try: with app.app_context(): ran_job = run_worker_once() except Exception: ran_job = False if not ran_job: time.sleep(poll_seconds)