from dataclasses import dataclass, field from datetime import UTC, datetime import subprocess import threading import time from sqlalchemy import func, select from sqlalchemy.orm import Session from l4d2web.db import session_scope from l4d2web.models import Job, JobLog, Server from l4d2web.services.host_commands import CommandCancelledError TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"} ACTIVE_JOB_STATES = {"running", "cancelling"} SERVER_OPERATIONS = {"initialize", "start", "stop", "delete"} _claim_lock = threading.Lock() _log_lock = threading.RLock() _worker_start_lock = threading.Lock() _workers_started = False @dataclass class SchedulerState: install_running: bool = False running_servers: set[int] = field(default_factory=set) def can_start(job, state: SchedulerState) -> bool: if job.operation == "install": return (not state.install_running) and (len(state.running_servers) == 0) if state.install_running: return False if job.server_id is None: return False return job.server_id not in state.running_servers def build_scheduler_state(session: Session) -> SchedulerState: state = SchedulerState() running_jobs = session.scalars(select(Job).where(Job.state.in_(ACTIVE_JOB_STATES))).all() for job in running_jobs: if job.operation == "install": state.install_running = True elif job.server_id is not None: state.running_servers.add(job.server_id) return state def claim_next_job() -> int | None: with _claim_lock: with session_scope() as db: state = build_scheduler_state(db) jobs = db.scalars(select(Job).where(Job.state == "queued").order_by(Job.created_at, Job.id)).all() now = datetime.now(UTC) for job in jobs: malformed_server_job = job.operation != "install" and job.server_id is None if not malformed_server_job and not can_start(job, state): continue job.state = "running" job.started_at = now job.updated_at = now job.exit_code = None db.flush() return job.id return None def run_worker_once() -> bool: job_id = claim_next_job() if job_id is None: return False run_job(job_id) return True def run_job(job_id: int) -> None: from l4d2web.services import l4d2_facade with session_scope() as db: job = db.scalar(select(Job).where(Job.id == job_id)) if job is None: return operation = job.operation server_id = job.server_id max_chars = 4096 def on_stdout(line: str) -> None: append_job_log_line(job_id, "stdout", line, max_chars=max_chars) def on_stderr(line: str) -> None: append_job_log_line(job_id, "stderr", line, max_chars=max_chars) def should_cancel() -> bool: with session_scope() as db: state = db.scalar(select(Job.state).where(Job.id == job_id)) return state == "cancelling" def raise_if_cancelled() -> None: if should_cancel(): raise CommandCancelledError(returncode=1, cmd=[operation], output="", stderr="") try: if operation == "install": l4d2_facade.install_runtime(on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel) elif operation in SERVER_OPERATIONS and server_id is None: raise ValueError(f"{operation} job has no server_id") elif operation == "initialize": l4d2_facade.initialize_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel) elif operation == "start": l4d2_facade.initialize_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel) raise_if_cancelled() l4d2_facade.start_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel) elif operation == "stop": l4d2_facade.stop_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel) elif operation == "delete": l4d2_facade.delete_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel) else: raise ValueError(f"unknown job operation: {operation}") if server_id is not None: refresh_server_actual_state_after_job(job_id, server_id) finish_job(job_id, "succeeded", 0) except CommandCancelledError as exc: error = "job cancelled; runtime state may be partial" append_job_log_line(job_id, "stderr", error, max_chars=max_chars) if server_id is not None: refresh_server_actual_state_after_job(job_id, server_id) exit_code = exc.returncode if exc.returncode is not None else 1 finish_job(job_id, "cancelled", exit_code, error=error) except subprocess.CalledProcessError as exc: error = exc.stderr or str(exc) if exc.stderr: append_job_log_line(job_id, "stderr", str(exc.stderr), max_chars=max_chars) if server_id is not None: refresh_server_actual_state_after_job(job_id, server_id) finish_job(job_id, "failed", exc.returncode, error=error) except Exception as exc: error = str(exc) append_job_log_line(job_id, "stderr", error, max_chars=max_chars) if server_id is not None: refresh_server_actual_state_after_job(job_id, server_id) finish_job(job_id, "failed", 1, error=error) def finish_job(job_id: int, state: str, exit_code: int | None, error: str = "") -> None: now = datetime.now(UTC) with session_scope() as db: job = db.scalar(select(Job).where(Job.id == job_id)) if job is None: return job.state = state job.exit_code = exit_code job.finished_at = now job.updated_at = now if job.server_id is not None: server = db.scalar(select(Server).where(Server.id == job.server_id)) if server is not None: server.last_error = "" if state == "succeeded" else error server.updated_at = now def append_job_log_line(job_id: int, stream: str, line: str, max_chars: int = 4096) -> int: with _log_lock: with session_scope() as db: return append_job_log(db, job_id, stream, line, max_chars=max_chars) def recover_stale_jobs() -> int: now = datetime.now(UTC) with session_scope() as db: jobs = db.scalars(select(Job).where(Job.state.in_(ACTIVE_JOB_STATES))).all() for job in jobs: job.state = "cancelled" if job.state == "cancelling" else "failed" job.exit_code = 1 job.finished_at = now job.updated_at = now return len(jobs) def append_job_log( session: Session, job_id: int, stream: str, line: str, max_chars: int = 4096, ) -> int: with _log_lock: last_seq = session.scalar(select(func.max(JobLog.seq)).where(JobLog.job_id == job_id)) or 0 next_seq = int(last_seq) + 1 session.add(JobLog(job_id=job_id, seq=next_seq, stream=stream, line=line[:max_chars])) session.flush() return next_seq def refresh_server_actual_state(server_id: int) -> str: from l4d2web.services import l4d2_facade now = datetime.now(UTC) with session_scope() as db: server = db.scalar(select(Server).where(Server.id == server_id)) if server is None: return "unknown" status = l4d2_facade.server_status(server.name) server.actual_state = status.state server.actual_state_updated_at = now server.updated_at = now return server.actual_state def refresh_server_actual_state_after_job(job_id: int, server_id: int) -> None: try: refresh_server_actual_state(server_id) except Exception as exc: append_job_log_line(job_id, "stderr", f"status refresh failed: {exc}") def start_job_workers(app) -> None: global _workers_started with _worker_start_lock: if _workers_started: return _workers_started = True threads = int(app.config.get("JOB_WORKER_THREADS", 4)) poll_seconds = float(app.config.get("JOB_WORKER_POLL_SECONDS", 1)) for index in range(threads): thread = threading.Thread( target=worker_loop, args=(app, poll_seconds), name=f"l4d2-job-worker-{index + 1}", daemon=True, ) thread.start() def worker_loop(app, poll_seconds: float) -> None: while True: ran_job = False try: with app.app_context(): ran_job = run_worker_once() except Exception: ran_job = False if not ran_job: time.sleep(poll_seconds)