- validate instance names at the host lib and web boundary against
[a-z0-9][a-z0-9_-]{0,63} to prevent path traversal via Server.name
- fail-closed on SECRET_KEY: load_config returns None when env unset,
create_app raises if missing or "dev" outside TESTING
- close login timing oracle by hashing a dummy digest when the user
is not found, equalizing response time
- set SESSION_COOKIE_SECURE outside TESTING
- delete_instance tolerates stop_service and fusermount3 failures so
partially-initialized instances clean up without contract breaks;
drops the is_mount() preflight that violated AGENTS.md
- document claim_next_job's single-process assumption
- clarify emit_step contract via docstring
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
316 lines
11 KiB
Python
316 lines
11 KiB
Python
from dataclasses import dataclass, field
|
|
from datetime import UTC, datetime
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from typing import Callable
|
|
|
|
from l4d2web.db import session_scope
|
|
from l4d2web.models import Job, JobLog, Server
|
|
from l4d2web.services.host_commands import CommandCancelledError
|
|
|
|
|
|
TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"}
|
|
ACTIVE_JOB_STATES = {"running", "cancelling"}
|
|
SERVER_OPERATIONS = {"initialize", "start", "stop", "delete"}
|
|
|
|
_claim_lock = threading.Lock()
|
|
_log_lock = threading.RLock()
|
|
_worker_start_lock = threading.Lock()
|
|
_workers_started = False
|
|
|
|
|
|
@dataclass
|
|
class SchedulerState:
|
|
install_running: bool = False
|
|
running_servers: set[int] = field(default_factory=set)
|
|
|
|
|
|
def can_start(job, state: SchedulerState) -> bool:
|
|
if job.operation == "install":
|
|
return (not state.install_running) and (len(state.running_servers) == 0)
|
|
if state.install_running:
|
|
return False
|
|
if job.server_id is None:
|
|
return False
|
|
return job.server_id not in state.running_servers
|
|
|
|
|
|
def build_scheduler_state(session: Session) -> SchedulerState:
|
|
state = SchedulerState()
|
|
running_jobs = session.scalars(select(Job).where(Job.state.in_(ACTIVE_JOB_STATES))).all()
|
|
for job in running_jobs:
|
|
if job.operation == "install":
|
|
state.install_running = True
|
|
elif job.server_id is not None:
|
|
state.running_servers.add(job.server_id)
|
|
return state
|
|
|
|
|
|
def claim_next_job() -> int | None:
|
|
"""Atomically claim the next runnable job.
|
|
|
|
`_claim_lock` is process-local, so correctness depends on a single web
|
|
process. The deployed unit pins `gunicorn --workers 1`; if that ever
|
|
changes, replace this with a row-level lock (`with_for_update`) to keep
|
|
job claims unique across processes.
|
|
"""
|
|
with _claim_lock:
|
|
with session_scope() as db:
|
|
state = build_scheduler_state(db)
|
|
jobs = db.scalars(select(Job).where(Job.state == "queued").order_by(Job.created_at, Job.id)).all()
|
|
now = datetime.now(UTC)
|
|
for job in jobs:
|
|
malformed_server_job = job.operation != "install" and job.server_id is None
|
|
if not malformed_server_job and not can_start(job, state):
|
|
continue
|
|
|
|
job.state = "running"
|
|
job.started_at = now
|
|
job.updated_at = now
|
|
job.exit_code = None
|
|
db.flush()
|
|
return job.id
|
|
return None
|
|
|
|
|
|
def run_worker_once() -> bool:
|
|
job_id = claim_next_job()
|
|
if job_id is None:
|
|
return False
|
|
run_job(job_id)
|
|
return True
|
|
|
|
|
|
def run_job(job_id: int) -> None:
|
|
from l4d2web.services import l4d2_facade
|
|
|
|
server_name = "unknown"
|
|
with session_scope() as db:
|
|
job = db.scalar(select(Job).where(Job.id == job_id))
|
|
if job is None:
|
|
return
|
|
operation = job.operation
|
|
server_id = job.server_id
|
|
if server_id is not None:
|
|
server = db.scalar(select(Server).where(Server.id == server_id))
|
|
if server is not None:
|
|
server_name = server.name
|
|
|
|
max_chars = 4096
|
|
|
|
def on_stdout(line: str) -> None:
|
|
append_job_log_line(job_id, "stdout", line, max_chars=max_chars)
|
|
|
|
def on_stderr(line: str) -> None:
|
|
append_job_log_line(job_id, "stderr", line, max_chars=max_chars)
|
|
|
|
def should_cancel() -> bool:
|
|
with session_scope() as db:
|
|
state = db.scalar(select(Job.state).where(Job.id == job_id))
|
|
return state == "cancelling"
|
|
|
|
def raise_if_cancelled() -> None:
|
|
if should_cancel():
|
|
raise CommandCancelledError(returncode=1, cmd=[operation], output="", stderr="")
|
|
|
|
def _run_with_boundaries(action: str, target: str, func: Callable, *args, **kwargs):
|
|
append_job_log_line(job_id, "stdout", f"starting {action} for {target}")
|
|
func(*args, **kwargs)
|
|
append_job_log_line(job_id, "stdout", f"finished {action} successfully")
|
|
|
|
try:
|
|
if operation == "install":
|
|
_run_with_boundaries(
|
|
"install",
|
|
"server",
|
|
l4d2_facade.install_runtime,
|
|
on_stdout=on_stdout,
|
|
on_stderr=on_stderr,
|
|
should_cancel=should_cancel,
|
|
)
|
|
elif operation in SERVER_OPERATIONS and server_id is None:
|
|
raise ValueError(f"{operation} job has no server_id")
|
|
elif operation == "initialize":
|
|
_run_with_boundaries(
|
|
"initialize",
|
|
server_name,
|
|
l4d2_facade.initialize_server,
|
|
server_id,
|
|
on_stdout=on_stdout,
|
|
on_stderr=on_stderr,
|
|
should_cancel=should_cancel,
|
|
)
|
|
elif operation == "start":
|
|
_run_with_boundaries(
|
|
"initialize",
|
|
server_name,
|
|
l4d2_facade.initialize_server,
|
|
server_id,
|
|
on_stdout=on_stdout,
|
|
on_stderr=on_stderr,
|
|
should_cancel=should_cancel,
|
|
)
|
|
raise_if_cancelled()
|
|
_run_with_boundaries(
|
|
"start",
|
|
server_name,
|
|
l4d2_facade.start_server,
|
|
server_id,
|
|
on_stdout=on_stdout,
|
|
on_stderr=on_stderr,
|
|
should_cancel=should_cancel,
|
|
)
|
|
elif operation == "stop":
|
|
_run_with_boundaries(
|
|
"stop",
|
|
server_name,
|
|
l4d2_facade.stop_server,
|
|
server_id,
|
|
on_stdout=on_stdout,
|
|
on_stderr=on_stderr,
|
|
should_cancel=should_cancel,
|
|
)
|
|
elif operation == "delete":
|
|
_run_with_boundaries(
|
|
"delete",
|
|
server_name,
|
|
l4d2_facade.delete_server,
|
|
server_id,
|
|
on_stdout=on_stdout,
|
|
on_stderr=on_stderr,
|
|
should_cancel=should_cancel,
|
|
)
|
|
else:
|
|
raise ValueError(f"unknown job operation: {operation}")
|
|
|
|
if server_id is not None:
|
|
refresh_server_actual_state_after_job(job_id, server_id)
|
|
finish_job(job_id, "succeeded", 0)
|
|
except CommandCancelledError as exc:
|
|
error = "job cancelled; runtime state may be partial"
|
|
append_job_log_line(job_id, "stderr", error, max_chars=max_chars)
|
|
if server_id is not None:
|
|
refresh_server_actual_state_after_job(job_id, server_id)
|
|
exit_code = exc.returncode if exc.returncode is not None else 1
|
|
finish_job(job_id, "cancelled", exit_code, error=error)
|
|
except subprocess.CalledProcessError as exc:
|
|
error = exc.stderr or str(exc)
|
|
if exc.stderr:
|
|
append_job_log_line(job_id, "stderr", str(exc.stderr), max_chars=max_chars)
|
|
if server_id is not None:
|
|
refresh_server_actual_state_after_job(job_id, server_id)
|
|
finish_job(job_id, "failed", exc.returncode, error=error)
|
|
except Exception as exc:
|
|
error = str(exc)
|
|
append_job_log_line(job_id, "stderr", error, max_chars=max_chars)
|
|
if server_id is not None:
|
|
refresh_server_actual_state_after_job(job_id, server_id)
|
|
finish_job(job_id, "failed", 1, error=error)
|
|
|
|
|
|
def finish_job(job_id: int, state: str, exit_code: int | None, error: str = "") -> None:
|
|
now = datetime.now(UTC)
|
|
with session_scope() as db:
|
|
job = db.scalar(select(Job).where(Job.id == job_id))
|
|
if job is None:
|
|
return
|
|
job.state = state
|
|
job.exit_code = exit_code
|
|
job.finished_at = now
|
|
job.updated_at = now
|
|
if job.server_id is not None:
|
|
server = db.scalar(select(Server).where(Server.id == job.server_id))
|
|
if server is not None:
|
|
server.last_error = "" if state == "succeeded" else error
|
|
server.updated_at = now
|
|
|
|
|
|
def append_job_log_line(job_id: int, stream: str, line: str, max_chars: int = 4096) -> int:
|
|
with _log_lock:
|
|
with session_scope() as db:
|
|
return append_job_log(db, job_id, stream, line, max_chars=max_chars)
|
|
|
|
|
|
def recover_stale_jobs() -> int:
|
|
now = datetime.now(UTC)
|
|
with session_scope() as db:
|
|
jobs = db.scalars(select(Job).where(Job.state.in_(ACTIVE_JOB_STATES))).all()
|
|
for job in jobs:
|
|
job.state = "cancelled" if job.state == "cancelling" else "failed"
|
|
job.exit_code = 1
|
|
job.finished_at = now
|
|
job.updated_at = now
|
|
return len(jobs)
|
|
|
|
|
|
def append_job_log(
|
|
session: Session,
|
|
job_id: int,
|
|
stream: str,
|
|
line: str,
|
|
max_chars: int = 4096,
|
|
) -> int:
|
|
with _log_lock:
|
|
last_seq = session.scalar(select(func.max(JobLog.seq)).where(JobLog.job_id == job_id)) or 0
|
|
next_seq = int(last_seq) + 1
|
|
session.add(JobLog(job_id=job_id, seq=next_seq, stream=stream, line=line[:max_chars]))
|
|
session.flush()
|
|
return next_seq
|
|
|
|
|
|
def refresh_server_actual_state(server_id: int) -> str:
|
|
from l4d2web.services import l4d2_facade
|
|
|
|
now = datetime.now(UTC)
|
|
with session_scope() as db:
|
|
server = db.scalar(select(Server).where(Server.id == server_id))
|
|
if server is None:
|
|
return "unknown"
|
|
status = l4d2_facade.server_status(server.name)
|
|
server.actual_state = status.state
|
|
server.actual_state_updated_at = now
|
|
server.updated_at = now
|
|
return server.actual_state
|
|
|
|
|
|
def refresh_server_actual_state_after_job(job_id: int, server_id: int) -> None:
|
|
try:
|
|
refresh_server_actual_state(server_id)
|
|
except Exception as exc:
|
|
append_job_log_line(job_id, "stderr", f"status refresh failed: {exc}")
|
|
|
|
|
|
def start_job_workers(app) -> None:
|
|
global _workers_started
|
|
|
|
with _worker_start_lock:
|
|
if _workers_started:
|
|
return
|
|
_workers_started = True
|
|
threads = int(app.config.get("JOB_WORKER_THREADS", 4))
|
|
poll_seconds = float(app.config.get("JOB_WORKER_POLL_SECONDS", 1))
|
|
for index in range(threads):
|
|
thread = threading.Thread(
|
|
target=worker_loop,
|
|
args=(app, poll_seconds),
|
|
name=f"l4d2-job-worker-{index + 1}",
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
|
|
def worker_loop(app, poll_seconds: float) -> None:
|
|
while True:
|
|
ran_job = False
|
|
try:
|
|
with app.app_context():
|
|
ran_job = run_worker_once()
|
|
except Exception:
|
|
ran_job = False
|
|
if not ran_job:
|
|
time.sleep(poll_seconds)
|