from dataclasses import dataclass, field from datetime import UTC, datetime import subprocess import threading import time from l4d2host.process import CommandCancelledError from sqlalchemy import func, select from sqlalchemy.orm import Session from l4d2web.db import session_scope from l4d2web.models import Job, JobLog, Server TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"} ACTIVE_JOB_STATES = {"running", "cancelling"} SERVER_OPERATIONS = {"initialize", "start", "stop", "delete"} _claim_lock = threading.Lock() _log_lock = threading.RLock() _worker_start_lock = threading.Lock() _workers_started = False @dataclass class SchedulerState: install_running: bool = False running_servers: set[int] = field(default_factory=set) def can_start(job, state: SchedulerState) -> bool: if job.operation == "install": return (not state.install_running) and (len(state.running_servers) == 0) if state.install_running: return False if job.server_id is None: return False return job.server_id not in state.running_servers def build_scheduler_state(session: Session) -> SchedulerState: state = SchedulerState() running_jobs = session.scalars(select(Job).where(Job.state.in_(ACTIVE_JOB_STATES))).all() for job in running_jobs: if job.operation == "install": state.install_running = True elif job.server_id is not None: state.running_servers.add(job.server_id) return state def claim_next_job() -> int | None: with _claim_lock: with session_scope() as db: state = build_scheduler_state(db) jobs = db.scalars(select(Job).where(Job.state == "queued").order_by(Job.created_at, Job.id)).all() now = datetime.now(UTC) for job in jobs: malformed_server_job = job.operation != "install" and job.server_id is None if not malformed_server_job and not can_start(job, state): continue job.state = "running" job.started_at = now job.updated_at = now job.exit_code = None db.flush() return job.id return None def run_worker_once() -> bool: job_id = claim_next_job() if job_id is None: return False run_job(job_id) return True def run_job(job_id: int) -> None: from l4d2web.services import l4d2_facade with session_scope() as db: job = db.scalar(select(Job).where(Job.id == job_id)) if job is None: return operation = job.operation server_id = job.server_id max_chars = 4096 def on_stdout(line: str) -> None: append_job_log_line(job_id, "stdout", line, max_chars=max_chars) def on_stderr(line: str) -> None: append_job_log_line(job_id, "stderr", line, max_chars=max_chars) def should_cancel() -> bool: with session_scope() as db: state = db.scalar(select(Job.state).where(Job.id == job_id)) return state == "cancelling" def raise_if_cancelled() -> None: if should_cancel(): raise CommandCancelledError(returncode=1, cmd=[operation], output="", stderr="") try: if operation == "install": l4d2_facade.install_runtime(on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel) elif operation in SERVER_OPERATIONS and server_id is None: raise ValueError(f"{operation} job has no server_id") elif operation == "initialize": l4d2_facade.initialize_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel) elif operation == "start": l4d2_facade.initialize_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel) raise_if_cancelled() l4d2_facade.start_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel) elif operation == "stop": l4d2_facade.stop_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel) elif operation == "delete": l4d2_facade.delete_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel) else: raise ValueError(f"unknown job operation: {operation}") if server_id is not None: refresh_server_actual_state_after_job(job_id, server_id) finish_job(job_id, "succeeded", 0) except CommandCancelledError as exc: error = "job cancelled; runtime state may be partial" append_job_log_line(job_id, "stderr", error, max_chars=max_chars) if server_id is not None: refresh_server_actual_state_after_job(job_id, server_id) exit_code = exc.returncode if exc.returncode is not None else 1 finish_job(job_id, "cancelled", exit_code, error=error) except subprocess.CalledProcessError as exc: error = exc.stderr or str(exc) if exc.stderr: append_job_log_line(job_id, "stderr", str(exc.stderr), max_chars=max_chars) if server_id is not None: refresh_server_actual_state_after_job(job_id, server_id) finish_job(job_id, "failed", exc.returncode, error=error) except Exception as exc: error = str(exc) append_job_log_line(job_id, "stderr", error, max_chars=max_chars) if server_id is not None: refresh_server_actual_state_after_job(job_id, server_id) finish_job(job_id, "failed", 1, error=error) def finish_job(job_id: int, state: str, exit_code: int | None, error: str = "") -> None: now = datetime.now(UTC) with session_scope() as db: job = db.scalar(select(Job).where(Job.id == job_id)) if job is None: return job.state = state job.exit_code = exit_code job.finished_at = now job.updated_at = now if job.server_id is not None: server = db.scalar(select(Server).where(Server.id == job.server_id)) if server is not None: server.last_error = "" if state == "succeeded" else error server.updated_at = now def append_job_log_line(job_id: int, stream: str, line: str, max_chars: int = 4096) -> int: with _log_lock: with session_scope() as db: return append_job_log(db, job_id, stream, line, max_chars=max_chars) def recover_stale_jobs() -> int: now = datetime.now(UTC) with session_scope() as db: jobs = db.scalars(select(Job).where(Job.state.in_(ACTIVE_JOB_STATES))).all() for job in jobs: job.state = "cancelled" if job.state == "cancelling" else "failed" job.exit_code = 1 job.finished_at = now job.updated_at = now return len(jobs) def append_job_log( session: Session, job_id: int, stream: str, line: str, max_chars: int = 4096, ) -> int: with _log_lock: last_seq = session.scalar(select(func.max(JobLog.seq)).where(JobLog.job_id == job_id)) or 0 next_seq = int(last_seq) + 1 session.add(JobLog(job_id=job_id, seq=next_seq, stream=stream, line=line[:max_chars])) session.flush() return next_seq def refresh_server_actual_state(server_id: int) -> str: from l4d2web.services import l4d2_facade now = datetime.now(UTC) with session_scope() as db: server = db.scalar(select(Server).where(Server.id == server_id)) if server is None: return "unknown" status = l4d2_facade.server_status(server.name) server.actual_state = status.state server.actual_state_updated_at = now server.updated_at = now return server.actual_state def refresh_server_actual_state_after_job(job_id: int, server_id: int) -> None: try: refresh_server_actual_state(server_id) except Exception as exc: append_job_log_line(job_id, "stderr", f"status refresh failed: {exc}") def start_job_workers(app) -> None: global _workers_started with _worker_start_lock: if _workers_started: return _workers_started = True threads = int(app.config.get("JOB_WORKER_THREADS", 4)) poll_seconds = float(app.config.get("JOB_WORKER_POLL_SECONDS", 1)) for index in range(threads): thread = threading.Thread( target=worker_loop, args=(app, poll_seconds), name=f"l4d2-job-worker-{index + 1}", daemon=True, ) thread.start() def worker_loop(app, poll_seconds: float) -> None: while True: ran_job = False try: with app.app_context(): ran_job = run_worker_once() except Exception: ran_job = False if not ran_job: time.sleep(poll_seconds)