left4me/l4d2web/services/job_worker.py

309 lines
10 KiB
Python

from dataclasses import dataclass, field
from datetime import UTC, datetime
import subprocess
import threading
import time
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from typing import Callable
from l4d2web.db import session_scope
from l4d2web.models import Job, JobLog, Server
from l4d2web.services.host_commands import CommandCancelledError
TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"}
ACTIVE_JOB_STATES = {"running", "cancelling"}
SERVER_OPERATIONS = {"initialize", "start", "stop", "delete"}
_claim_lock = threading.Lock()
_log_lock = threading.RLock()
_worker_start_lock = threading.Lock()
_workers_started = False
@dataclass
class SchedulerState:
install_running: bool = False
running_servers: set[int] = field(default_factory=set)
def can_start(job, state: SchedulerState) -> bool:
if job.operation == "install":
return (not state.install_running) and (len(state.running_servers) == 0)
if state.install_running:
return False
if job.server_id is None:
return False
return job.server_id not in state.running_servers
def build_scheduler_state(session: Session) -> SchedulerState:
state = SchedulerState()
running_jobs = session.scalars(select(Job).where(Job.state.in_(ACTIVE_JOB_STATES))).all()
for job in running_jobs:
if job.operation == "install":
state.install_running = True
elif job.server_id is not None:
state.running_servers.add(job.server_id)
return state
def claim_next_job() -> int | None:
with _claim_lock:
with session_scope() as db:
state = build_scheduler_state(db)
jobs = db.scalars(select(Job).where(Job.state == "queued").order_by(Job.created_at, Job.id)).all()
now = datetime.now(UTC)
for job in jobs:
malformed_server_job = job.operation != "install" and job.server_id is None
if not malformed_server_job and not can_start(job, state):
continue
job.state = "running"
job.started_at = now
job.updated_at = now
job.exit_code = None
db.flush()
return job.id
return None
def run_worker_once() -> bool:
job_id = claim_next_job()
if job_id is None:
return False
run_job(job_id)
return True
def run_job(job_id: int) -> None:
from l4d2web.services import l4d2_facade
server_name = "unknown"
with session_scope() as db:
job = db.scalar(select(Job).where(Job.id == job_id))
if job is None:
return
operation = job.operation
server_id = job.server_id
if server_id is not None:
server = db.scalar(select(Server).where(Server.id == server_id))
if server is not None:
server_name = server.name
max_chars = 4096
def on_stdout(line: str) -> None:
append_job_log_line(job_id, "stdout", line, max_chars=max_chars)
def on_stderr(line: str) -> None:
append_job_log_line(job_id, "stderr", line, max_chars=max_chars)
def should_cancel() -> bool:
with session_scope() as db:
state = db.scalar(select(Job.state).where(Job.id == job_id))
return state == "cancelling"
def raise_if_cancelled() -> None:
if should_cancel():
raise CommandCancelledError(returncode=1, cmd=[operation], output="", stderr="")
def _run_with_boundaries(action: str, target: str, func: Callable, *args, **kwargs):
append_job_log_line(job_id, "stdout", f"starting {action} for {target}")
func(*args, **kwargs)
append_job_log_line(job_id, "stdout", f"finished {action} successfully")
try:
if operation == "install":
_run_with_boundaries(
"install",
"server",
l4d2_facade.install_runtime,
on_stdout=on_stdout,
on_stderr=on_stderr,
should_cancel=should_cancel,
)
elif operation in SERVER_OPERATIONS and server_id is None:
raise ValueError(f"{operation} job has no server_id")
elif operation == "initialize":
_run_with_boundaries(
"initialize",
server_name,
l4d2_facade.initialize_server,
server_id,
on_stdout=on_stdout,
on_stderr=on_stderr,
should_cancel=should_cancel,
)
elif operation == "start":
_run_with_boundaries(
"initialize",
server_name,
l4d2_facade.initialize_server,
server_id,
on_stdout=on_stdout,
on_stderr=on_stderr,
should_cancel=should_cancel,
)
raise_if_cancelled()
_run_with_boundaries(
"start",
server_name,
l4d2_facade.start_server,
server_id,
on_stdout=on_stdout,
on_stderr=on_stderr,
should_cancel=should_cancel,
)
elif operation == "stop":
_run_with_boundaries(
"stop",
server_name,
l4d2_facade.stop_server,
server_id,
on_stdout=on_stdout,
on_stderr=on_stderr,
should_cancel=should_cancel,
)
elif operation == "delete":
_run_with_boundaries(
"delete",
server_name,
l4d2_facade.delete_server,
server_id,
on_stdout=on_stdout,
on_stderr=on_stderr,
should_cancel=should_cancel,
)
else:
raise ValueError(f"unknown job operation: {operation}")
if server_id is not None:
refresh_server_actual_state_after_job(job_id, server_id)
finish_job(job_id, "succeeded", 0)
except CommandCancelledError as exc:
error = "job cancelled; runtime state may be partial"
append_job_log_line(job_id, "stderr", error, max_chars=max_chars)
if server_id is not None:
refresh_server_actual_state_after_job(job_id, server_id)
exit_code = exc.returncode if exc.returncode is not None else 1
finish_job(job_id, "cancelled", exit_code, error=error)
except subprocess.CalledProcessError as exc:
error = exc.stderr or str(exc)
if exc.stderr:
append_job_log_line(job_id, "stderr", str(exc.stderr), max_chars=max_chars)
if server_id is not None:
refresh_server_actual_state_after_job(job_id, server_id)
finish_job(job_id, "failed", exc.returncode, error=error)
except Exception as exc:
error = str(exc)
append_job_log_line(job_id, "stderr", error, max_chars=max_chars)
if server_id is not None:
refresh_server_actual_state_after_job(job_id, server_id)
finish_job(job_id, "failed", 1, error=error)
def finish_job(job_id: int, state: str, exit_code: int | None, error: str = "") -> None:
now = datetime.now(UTC)
with session_scope() as db:
job = db.scalar(select(Job).where(Job.id == job_id))
if job is None:
return
job.state = state
job.exit_code = exit_code
job.finished_at = now
job.updated_at = now
if job.server_id is not None:
server = db.scalar(select(Server).where(Server.id == job.server_id))
if server is not None:
server.last_error = "" if state == "succeeded" else error
server.updated_at = now
def append_job_log_line(job_id: int, stream: str, line: str, max_chars: int = 4096) -> int:
with _log_lock:
with session_scope() as db:
return append_job_log(db, job_id, stream, line, max_chars=max_chars)
def recover_stale_jobs() -> int:
now = datetime.now(UTC)
with session_scope() as db:
jobs = db.scalars(select(Job).where(Job.state.in_(ACTIVE_JOB_STATES))).all()
for job in jobs:
job.state = "cancelled" if job.state == "cancelling" else "failed"
job.exit_code = 1
job.finished_at = now
job.updated_at = now
return len(jobs)
def append_job_log(
session: Session,
job_id: int,
stream: str,
line: str,
max_chars: int = 4096,
) -> int:
with _log_lock:
last_seq = session.scalar(select(func.max(JobLog.seq)).where(JobLog.job_id == job_id)) or 0
next_seq = int(last_seq) + 1
session.add(JobLog(job_id=job_id, seq=next_seq, stream=stream, line=line[:max_chars]))
session.flush()
return next_seq
def refresh_server_actual_state(server_id: int) -> str:
from l4d2web.services import l4d2_facade
now = datetime.now(UTC)
with session_scope() as db:
server = db.scalar(select(Server).where(Server.id == server_id))
if server is None:
return "unknown"
status = l4d2_facade.server_status(server.name)
server.actual_state = status.state
server.actual_state_updated_at = now
server.updated_at = now
return server.actual_state
def refresh_server_actual_state_after_job(job_id: int, server_id: int) -> None:
try:
refresh_server_actual_state(server_id)
except Exception as exc:
append_job_log_line(job_id, "stderr", f"status refresh failed: {exc}")
def start_job_workers(app) -> None:
global _workers_started
with _worker_start_lock:
if _workers_started:
return
_workers_started = True
threads = int(app.config.get("JOB_WORKER_THREADS", 4))
poll_seconds = float(app.config.get("JOB_WORKER_POLL_SECONDS", 1))
for index in range(threads):
thread = threading.Thread(
target=worker_loop,
args=(app, poll_seconds),
name=f"l4d2-job-worker-{index + 1}",
daemon=True,
)
thread.start()
def worker_loop(app, poll_seconds: float) -> None:
while True:
ran_job = False
try:
with app.app_context():
ran_job = run_worker_once()
except Exception:
ran_job = False
if not ran_job:
time.sleep(poll_seconds)