From 91d042cf3356d2a761b0f3af4991a057d0d240fc Mon Sep 17 00:00:00 2001 From: mwiegand Date: Wed, 6 May 2026 14:08:18 +0200 Subject: [PATCH] feat(l4d2-web): execute queued lifecycle jobs --- .../plans/2026-05-06-l4d2-web-queue-worker.md | 278 +++++++++++++++ l4d2web/app.py | 4 +- l4d2web/config.py | 2 + l4d2web/routes/job_routes.py | 43 ++- l4d2web/routes/page_routes.py | 13 +- l4d2web/services/job_worker.py | 177 +++++++++- l4d2web/static/js/sse.js | 17 +- l4d2web/templates/admin.html | 9 + l4d2web/tests/test_job_logs.py | 39 +- l4d2web/tests/test_job_worker.py | 333 ++++++++++++++++-- l4d2web/tests/test_pages.py | 38 +- 11 files changed, 891 insertions(+), 62 deletions(-) create mode 100644 docs/superpowers/plans/2026-05-06-l4d2-web-queue-worker.md diff --git a/docs/superpowers/plans/2026-05-06-l4d2-web-queue-worker.md b/docs/superpowers/plans/2026-05-06-l4d2-web-queue-worker.md new file mode 100644 index 0000000..3041443 --- /dev/null +++ b/docs/superpowers/plans/2026-05-06-l4d2-web-queue-worker.md @@ -0,0 +1,278 @@ +# L4D2 Web Queue Worker Implementation Plan + +> **Approval gate:** This plan may be written and refined without further approval. Do not implement code changes from this plan until the user explicitly approves implementation. + +**Goal:** Complete the `l4d2web` async lifecycle queue so queued jobs are claimed, executed through the direct `l4d2host` Python APIs, logged to `job_logs`, reflected in server state, and streamed live to the UI. + +**Architecture:** Keep the v1 single-process Flask architecture. Use DB-backed queued jobs as the durable source of truth, worker threads inside the Flask process, SQLite-safe process-local locks, and direct imports through `l4d2web.services.l4d2_facade`. Do not shell out to `l4d2ctl` from the web app. + +--- + +## Current Gap + +- Server lifecycle routes create `Job(state="queued")` rows. +- `l4d2web.services.job_worker` has scheduler helpers, stale recovery, command-log append, and actual-state refresh helpers. +- No worker claims queued jobs. +- No code dispatches queued operations to `l4d2_facade`. +- No command callbacks persist live stdout/stderr while jobs run. +- Job-log SSE currently replays existing rows once and does not live-follow new rows. +- Job-log SSE emits `stdout`/`stderr` custom events, while `static/js/sse.js` only handles default messages. +- No web route currently enqueues global `install` jobs. + +--- + +## Locked Decisions + +- Queue execution uses direct Python imports through `l4d2web.services.l4d2_facade`. +- The queue is DB-backed, not an in-memory `queue.Queue`. +- Worker threads are in-process daemon threads. +- SQLite concurrency is protected with process-local locks; no distributed lock manager is added. +- Workers are not started during normal tests. +- `POST /admin/install` is added as the admin-only runtime install/update entry point. +- `install` jobs have `server_id=None` and are globally exclusive. +- Server-specific jobs do not overlap on the same `server_id`. +- Different server jobs can run concurrently when no install job is running. +- A web `start` job applies the live-linked blueprint before start by running `initialize_server(server_id)` and then `start_server(server_id)`. This satisfies “blueprint updates apply on next action.” +- `delete` removes the host instance/runtime through `l4d2host`; it does not delete the web `Server` row in v1. +- Command log rows are retained indefinitely. + +--- + +## Task 1: Extend Worker Tests First + +**Files:** +- Modify: `l4d2web/tests/test_job_worker.py` +- Modify as needed: `l4d2web/tests/test_job_logs.py` + +Add tests that verify the worker behavior without touching real systemd, Steam, or `/opt/l4d2`. Use monkeypatched `l4d2web.services.l4d2_facade` functions. + +Required coverage: + +- `run_worker_once()` claims the oldest runnable queued job. +- A successful server job transitions `queued -> running -> succeeded` and sets `exit_code=0`, `started_at`, `finished_at`, and `updated_at`. +- A successful job persists stdout/stderr callback lines in `job_logs`. +- A `subprocess.CalledProcessError` transitions the job to `failed` and stores `exit_code=exc.returncode`. +- An unexpected exception transitions the job to `failed` with `exit_code=1`. +- Same-server jobs do not overlap. +- Different-server jobs can be claimed concurrently by separate worker passes. +- An `install` job is not claimed while any server job is running. +- Server jobs are not claimed while an `install` job is running. +- Startup recovery marks stale `running` jobs as `failed`. +- Actual server state is refreshed after server-specific lifecycle jobs. +- `Server.last_error` is cleared on success and set on failure. + +Verification command: + +```bash +pytest l4d2web/tests/test_job_worker.py -q +``` + +Expected before implementation: FAIL. + +--- + +## Task 2: Implement Queue Claiming And Job Execution + +**Files:** +- Modify: `l4d2web/services/job_worker.py` + +Add worker-core functions: + +- `build_scheduler_state(session) -> SchedulerState` +- `claim_next_job() -> int | None` +- `run_worker_once() -> bool` +- `run_job(job_id: int) -> None` +- `finish_job(job_id: int, state: str, exit_code: int | None, error: str = "") -> None` +- `append_job_log_line(job_id: int, stream: str, line: str, max_chars: int = 4096) -> int` + +Implementation rules: + +- Use a module-level claim lock around scheduler-state construction, queued-job selection, and `queued -> running` transition. +- Commit the `running` transition before executing any host operation. +- Do not keep a DB session open while a host operation runs. +- Use a module-level log lock around `append_job_log()` so concurrent stdout/stderr callback threads cannot duplicate `seq` values. +- Recompute scheduler state from `running` jobs in the DB, not from only in-memory state. +- Select queued jobs by `created_at`, then `id` for deterministic order. +- Skip malformed server operations with no `server_id` by failing the job cleanly. +- Treat unknown operations as failed jobs, not worker-thread crashes. + +Operation dispatch: + +```text +install -> l4d2_facade.install_runtime(...) +initialize -> l4d2_facade.initialize_server(server_id, ...) +start -> l4d2_facade.initialize_server(server_id, ...), then l4d2_facade.start_server(server_id, ...) +stop -> l4d2_facade.stop_server(server_id, ...) +delete -> l4d2_facade.delete_server(server_id, ...) +``` + +Failure handling: + +- `subprocess.CalledProcessError`: append remaining stderr if useful, fail with `exit_code=returncode`. +- Any other exception: append exception text to stderr, fail with `exit_code=1`. +- Never let a job exception kill the worker loop. + +Verification command: + +```bash +pytest l4d2web/tests/test_job_worker.py -q +``` + +Expected after implementation: PASS. + +--- + +## Task 3: Add Worker Thread Startup + +**Files:** +- Modify: `l4d2web/config.py` +- Modify: `l4d2web/app.py` +- Modify: `l4d2web/services/job_worker.py` +- Modify: `l4d2web/tests/test_job_worker.py` + +Add config: + +```python +"JOB_WORKER_ENABLED": True +"JOB_WORKER_POLL_SECONDS": 1 +``` + +Add worker lifecycle functions: + +- `start_job_workers(app) -> None` +- `worker_loop(app, poll_seconds: float) -> None` + +Startup behavior: + +- `create_app()` still calls `recover_stale_jobs()`. +- After recovery, `create_app()` starts workers only when enabled and not in `TESTING`. +- Guard against duplicate worker startup in the same process. +- Worker threads run as daemon threads. +- Each worker loop uses `app.app_context()` around `run_worker_once()`. +- If no job was run, sleep for `JOB_WORKER_POLL_SECONDS`. + +Testing requirements: + +- Tests should not accidentally start real background workers. +- Add a focused startup test with monkeypatched `start_job_workers` if needed. + +Verification command: + +```bash +pytest l4d2web/tests/test_job_worker.py -q +``` + +--- + +## Task 4: Make Job Log SSE Live-Follow + +**Files:** +- Modify: `l4d2web/routes/job_routes.py` +- Modify: `l4d2web/static/js/sse.js` +- Modify: `l4d2web/tests/test_job_logs.py` + +Route behavior: + +- Authorize the job before streaming. +- Replay rows with `seq > last_seq` up to `JOB_LOG_REPLAY_LIMIT`. +- Continue polling for new rows while the job is not terminal. +- Close the stream after all available logs are sent and the job state is terminal. +- Keep emitting `id: ` so EventSource can resume. +- Keep `event: stdout` and `event: stderr` for job logs. + +JS behavior: + +- Keep handling default server-log messages via `source.onmessage`. +- Also register `stdout` and `stderr` listeners that append job-log lines to the same element. +- Prefix custom job-log events with the stream name only if useful for readability. + +Terminal states: + +```text +succeeded +failed +cancelled +``` + +`cancelled` is reserved for future use and does not require cancellation support in this task. + +Verification command: + +```bash +pytest l4d2web/tests/test_job_logs.py -q +``` + +--- + +## Task 5: Add Admin Runtime Install Action + +**Files:** +- Modify: `l4d2web/routes/page_routes.py` +- Modify: `l4d2web/templates/admin.html` +- Modify: `l4d2web/tests/test_pages.py` or add a focused admin route test + +Behavior: + +- `POST /admin/install` requires `@require_admin`. +- Creates `Job(user_id=current_admin.id, server_id=None, operation="install", state="queued")`. +- Redirects to `/admin/jobs`. +- Non-admin logged-in users receive `403`. +- Anonymous users are redirected to login. +- Admin page shows a CSRF-protected form/button for runtime install/update. + +Verification command: + +```bash +pytest l4d2web/tests/test_pages.py -q +``` + +--- + +## Task 6: Full Verification And Review + +Run focused suites first: + +```bash +pytest l4d2web/tests/test_job_worker.py -q +pytest l4d2web/tests/test_job_logs.py -q +pytest l4d2web/tests/test_pages.py -q +``` + +Then run the full web suite: + +```bash +pytest l4d2web/tests -q +``` + +Refresh the code index after implementation: + +```bash +ccc index +``` + +Request a final read-only review focused on: + +- queue claiming races +- duplicate worker startup +- job-log sequence ordering +- error handling and `last_error` +- live SSE behavior +- `start` applying blueprint updates before host start + +--- + +## Commit Strategy + +Use small commits after passing relevant tests: + +1. `feat(l4d2-web): execute queued lifecycle jobs` +2. `feat(l4d2-web): live-follow queued job logs` +3. `feat(l4d2-web): add admin runtime install job` + +Do not commit unless the user explicitly asks for commits. + +--- + +## Open Approval Gate + +Before modifying implementation files, ask the user for explicit approval to proceed with the queue-worker implementation. diff --git a/l4d2web/app.py b/l4d2web/app.py index 89fd3be..898360d 100644 --- a/l4d2web/app.py +++ b/l4d2web/app.py @@ -15,7 +15,7 @@ from l4d2web.routes.log_routes import bp as log_bp from l4d2web.routes.overlay_routes import bp as overlay_bp from l4d2web.routes.page_routes import bp as page_bp from l4d2web.routes.server_routes import bp as server_bp -from l4d2web.services.job_worker import recover_stale_jobs +from l4d2web.services.job_worker import recover_stale_jobs, start_job_workers def create_app(test_config: dict[str, object] | None = None) -> Flask: @@ -63,6 +63,8 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask: if app.config.get("TESTING"): reset_login_rate_limits() recover_stale_jobs() + if app.config.get("JOB_WORKER_ENABLED") and not app.config.get("TESTING"): + start_job_workers(app) @app.get("/health") def health(): diff --git a/l4d2web/config.py b/l4d2web/config.py index ad7ea2e..2861208 100644 --- a/l4d2web/config.py +++ b/l4d2web/config.py @@ -3,6 +3,8 @@ DEFAULT_CONFIG: dict[str, object] = { "DATABASE_URL": "sqlite:///l4d2web.db", "STATUS_REFRESH_SECONDS": 8, "JOB_WORKER_THREADS": 4, + "JOB_WORKER_ENABLED": True, + "JOB_WORKER_POLL_SECONDS": 1, "JOB_LOG_REPLAY_LIMIT": 2000, "JOB_LOG_LINE_MAX_CHARS": 4096, } diff --git a/l4d2web/routes/job_routes.py b/l4d2web/routes/job_routes.py index 4fdcd22..163f863 100644 --- a/l4d2web/routes/job_routes.py +++ b/l4d2web/routes/job_routes.py @@ -1,3 +1,5 @@ +import time + from flask import Blueprint, Response, current_app, request from sqlalchemy import select @@ -7,6 +9,14 @@ from l4d2web.models import Job, JobLog bp = Blueprint("job", __name__) +TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"} + + +def format_sse_event(seq: int, event: str, data: str) -> str: + lines = [f"id: {seq}", f"event: {event}"] + for line in data.splitlines() or [""]: + lines.append(f"data: {line}") + return "\n".join(lines) + "\n\n" @bp.get("/jobs//stream") @@ -15,8 +25,9 @@ def stream_job(job_id: int) -> Response: user = current_user() assert user is not None - last_seq = int(request.args.get("last_seq", "0")) + last_seq = int(request.args.get("last_seq") or request.headers.get("Last-Event-ID") or "0") limit = int(current_app.config["JOB_LOG_REPLAY_LIMIT"]) + poll_seconds = float(current_app.config.get("JOB_WORKER_POLL_SECONDS", 1)) with session_scope() as db: job = db.scalar(select(Job).where(Job.id == job_id, Job.user_id == user.id)) @@ -24,16 +35,26 @@ def stream_job(job_id: int) -> Response: return Response(status=404) def generate(): - with session_scope() as db: - rows = db.scalars( - select(JobLog) - .where(JobLog.job_id == job_id, JobLog.seq > last_seq) - .order_by(JobLog.seq) - .limit(limit) - ).all() + next_seq = last_seq + while True: + with session_scope() as db: + job = db.scalar(select(Job).where(Job.id == job_id)) + if job is None: + return + rows = db.scalars( + select(JobLog) + .where(JobLog.job_id == job_id, JobLog.seq > next_seq) + .order_by(JobLog.seq) + .limit(limit) + ).all() + terminal = job.state in TERMINAL_JOB_STATES + for row in rows: - yield f"id: {row.seq}\n" - yield f"event: {row.stream}\n" - yield f"data: {row.line}\n\n" + next_seq = row.seq + yield format_sse_event(row.seq, row.stream, row.line) + + if terminal and len(rows) < limit: + return + time.sleep(poll_seconds) return Response(generate(), mimetype="text/event-stream") diff --git a/l4d2web/routes/page_routes.py b/l4d2web/routes/page_routes.py index a2b4917..b6f6817 100644 --- a/l4d2web/routes/page_routes.py +++ b/l4d2web/routes/page_routes.py @@ -1,6 +1,6 @@ import json -from flask import Blueprint, Response, current_app, render_template +from flask import Blueprint, Response, redirect, render_template from sqlalchemy import select from l4d2web.auth import current_user, require_admin, require_login @@ -24,6 +24,16 @@ def admin_home() -> str: return render_template("admin.html") +@bp.post("/admin/install") +@require_admin +def enqueue_runtime_install() -> Response: + user = current_user() + assert user is not None + with session_scope() as db: + db.add(Job(user_id=user.id, server_id=None, operation="install", state="queued")) + return redirect("/admin/jobs") + + @bp.get("/admin/users") @require_admin def admin_users() -> str: @@ -151,4 +161,3 @@ def blueprint_page(blueprint_id: int): arguments=json.loads(blueprint.arguments), config_lines=json.loads(blueprint.config), ) - diff --git a/l4d2web/services/job_worker.py b/l4d2web/services/job_worker.py index d762e98..346cfb7 100644 --- a/l4d2web/services/job_worker.py +++ b/l4d2web/services/job_worker.py @@ -1,5 +1,8 @@ from dataclasses import dataclass, field from datetime import UTC, datetime +import subprocess +import threading +import time from sqlalchemy import func, select from sqlalchemy.orm import Session @@ -8,6 +11,15 @@ from l4d2web.db import session_scope from l4d2web.models import Job, JobLog, Server +TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"} +SERVER_OPERATIONS = {"initialize", "start", "stop", "delete"} + +_claim_lock = threading.Lock() +_log_lock = threading.RLock() +_worker_start_lock = threading.Lock() +_workers_started = False + + @dataclass class SchedulerState: install_running: bool = False @@ -24,12 +36,128 @@ def can_start(job, state: SchedulerState) -> bool: return job.server_id not in state.running_servers +def build_scheduler_state(session: Session) -> SchedulerState: + state = SchedulerState() + running_jobs = session.scalars(select(Job).where(Job.state == "running")).all() + for job in running_jobs: + if job.operation == "install": + state.install_running = True + elif job.server_id is not None: + state.running_servers.add(job.server_id) + return state + + +def claim_next_job() -> int | None: + with _claim_lock: + with session_scope() as db: + state = build_scheduler_state(db) + jobs = db.scalars(select(Job).where(Job.state == "queued").order_by(Job.created_at, Job.id)).all() + now = datetime.now(UTC) + for job in jobs: + malformed_server_job = job.operation != "install" and job.server_id is None + if not malformed_server_job and not can_start(job, state): + continue + + job.state = "running" + job.started_at = now + job.updated_at = now + job.exit_code = None + db.flush() + return job.id + return None + + +def run_worker_once() -> bool: + job_id = claim_next_job() + if job_id is None: + return False + run_job(job_id) + return True + + +def run_job(job_id: int) -> None: + from l4d2web.services import l4d2_facade + + with session_scope() as db: + job = db.scalar(select(Job).where(Job.id == job_id)) + if job is None: + return + operation = job.operation + server_id = job.server_id + + max_chars = 4096 + + def on_stdout(line: str) -> None: + append_job_log_line(job_id, "stdout", line, max_chars=max_chars) + + def on_stderr(line: str) -> None: + append_job_log_line(job_id, "stderr", line, max_chars=max_chars) + + try: + if operation == "install": + l4d2_facade.install_runtime(on_stdout=on_stdout, on_stderr=on_stderr) + elif operation in SERVER_OPERATIONS and server_id is None: + raise ValueError(f"{operation} job has no server_id") + elif operation == "initialize": + l4d2_facade.initialize_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr) + elif operation == "start": + l4d2_facade.initialize_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr) + l4d2_facade.start_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr) + elif operation == "stop": + l4d2_facade.stop_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr) + elif operation == "delete": + l4d2_facade.delete_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr) + else: + raise ValueError(f"unknown job operation: {operation}") + + if server_id is not None: + refresh_server_actual_state_after_job(job_id, server_id) + finish_job(job_id, "succeeded", 0) + except subprocess.CalledProcessError as exc: + error = exc.stderr or str(exc) + if exc.stderr: + append_job_log_line(job_id, "stderr", str(exc.stderr), max_chars=max_chars) + if server_id is not None: + refresh_server_actual_state_after_job(job_id, server_id) + finish_job(job_id, "failed", exc.returncode, error=error) + except Exception as exc: + error = str(exc) + append_job_log_line(job_id, "stderr", error, max_chars=max_chars) + if server_id is not None: + refresh_server_actual_state_after_job(job_id, server_id) + finish_job(job_id, "failed", 1, error=error) + + +def finish_job(job_id: int, state: str, exit_code: int | None, error: str = "") -> None: + now = datetime.now(UTC) + with session_scope() as db: + job = db.scalar(select(Job).where(Job.id == job_id)) + if job is None: + return + job.state = state + job.exit_code = exit_code + job.finished_at = now + job.updated_at = now + if job.server_id is not None: + server = db.scalar(select(Server).where(Server.id == job.server_id)) + if server is not None: + server.last_error = "" if state == "succeeded" else error + server.updated_at = now + + +def append_job_log_line(job_id: int, stream: str, line: str, max_chars: int = 4096) -> int: + with _log_lock: + with session_scope() as db: + return append_job_log(db, job_id, stream, line, max_chars=max_chars) + + def recover_stale_jobs() -> int: now = datetime.now(UTC) with session_scope() as db: jobs = db.scalars(select(Job).where(Job.state == "running")).all() for job in jobs: job.state = "failed" + job.exit_code = 1 job.finished_at = now job.updated_at = now return len(jobs) @@ -42,11 +170,12 @@ def append_job_log( line: str, max_chars: int = 4096, ) -> int: - last_seq = session.scalar(select(func.max(JobLog.seq)).where(JobLog.job_id == job_id)) or 0 - next_seq = int(last_seq) + 1 - session.add(JobLog(job_id=job_id, seq=next_seq, stream=stream, line=line[:max_chars])) - session.flush() - return next_seq + with _log_lock: + last_seq = session.scalar(select(func.max(JobLog.seq)).where(JobLog.job_id == job_id)) or 0 + next_seq = int(last_seq) + 1 + session.add(JobLog(job_id=job_id, seq=next_seq, stream=stream, line=line[:max_chars])) + session.flush() + return next_seq def refresh_server_actual_state(server_id: int) -> str: @@ -62,3 +191,41 @@ def refresh_server_actual_state(server_id: int) -> str: server.actual_state_updated_at = now server.updated_at = now return server.actual_state + + +def refresh_server_actual_state_after_job(job_id: int, server_id: int) -> None: + try: + refresh_server_actual_state(server_id) + except Exception as exc: + append_job_log_line(job_id, "stderr", f"status refresh failed: {exc}") + + +def start_job_workers(app) -> None: + global _workers_started + + with _worker_start_lock: + if _workers_started: + return + _workers_started = True + threads = int(app.config.get("JOB_WORKER_THREADS", 4)) + poll_seconds = float(app.config.get("JOB_WORKER_POLL_SECONDS", 1)) + for index in range(threads): + thread = threading.Thread( + target=worker_loop, + args=(app, poll_seconds), + name=f"l4d2-job-worker-{index + 1}", + daemon=True, + ) + thread.start() + + +def worker_loop(app, poll_seconds: float) -> None: + while True: + ran_job = False + try: + with app.app_context(): + ran_job = run_worker_once() + except Exception: + ran_job = False + if not ran_job: + time.sleep(poll_seconds) diff --git a/l4d2web/static/js/sse.js b/l4d2web/static/js/sse.js index f023ba3..8d94666 100644 --- a/l4d2web/static/js/sse.js +++ b/l4d2web/static/js/sse.js @@ -5,10 +5,23 @@ function streamTextToElement(element) { } const source = new EventSource(url); - source.onmessage = (event) => { - element.textContent += `${event.data}\n`; + + const appendLine = (line) => { + element.textContent += `${line}\n`; element.scrollTop = element.scrollHeight; }; + + source.onmessage = (event) => { + appendLine(event.data); + }; + + source.addEventListener("stdout", (event) => { + appendLine(event.data); + }); + + source.addEventListener("stderr", (event) => { + appendLine(`[stderr] ${event.data}`); + }); } document.addEventListener("DOMContentLoaded", () => { diff --git a/l4d2web/templates/admin.html b/l4d2web/templates/admin.html index bc9ffad..c7541aa 100644 --- a/l4d2web/templates/admin.html +++ b/l4d2web/templates/admin.html @@ -10,4 +10,13 @@
  • Jobs
  • + +
    +

    Runtime

    +

    Queue a Steam runtime install/update job for the local host.

    +
    + + +
    +
    {% endblock %} diff --git a/l4d2web/tests/test_job_logs.py b/l4d2web/tests/test_job_logs.py index cb88b55..0982b19 100644 --- a/l4d2web/tests/test_job_logs.py +++ b/l4d2web/tests/test_job_logs.py @@ -1,3 +1,5 @@ +from pathlib import Path + from sqlalchemy import text import pytest @@ -21,7 +23,7 @@ def seeded_job_logs(tmp_path, monkeypatch): session.add(user) session.flush() - job = Job(user_id=user.id, server_id=None, operation="install", state="queued") + job = Job(user_id=user.id, server_id=None, operation="install", state="succeeded") session.add(job) session.flush() @@ -67,3 +69,38 @@ def test_sse_resume_from_last_seq(seeded_job_logs) -> None: response = client.get(f"/jobs/{job_id}/stream?last_seq=5") assert response.status_code == 200 + + +def test_sse_replays_custom_job_log_events(seeded_job_logs) -> None: + app, job_id, user_id = seeded_job_logs + client = app.test_client() + with client.session_transaction() as sess: + sess["user_id"] = user_id + + response = client.get(f"/jobs/{job_id}/stream?last_seq=5") + text = response.get_data(as_text=True) + + assert "id: 6\n" in text + assert "event: stdout\n" in text + assert "data: line-6\n\n" in text + assert "data: line-5\n\n" not in text + + +def test_sse_resumes_from_last_event_id_header(seeded_job_logs) -> None: + app, job_id, user_id = seeded_job_logs + client = app.test_client() + with client.session_transaction() as sess: + sess["user_id"] = user_id + + response = client.get(f"/jobs/{job_id}/stream", headers={"Last-Event-ID": "6"}) + text = response.get_data(as_text=True) + + assert "data: line-7\n\n" in text + assert "data: line-6\n\n" not in text + + +def test_sse_js_handles_job_log_custom_events() -> None: + js = Path("l4d2web/static/js/sse.js").read_text() + + assert 'addEventListener("stdout"' in js + assert 'addEventListener("stderr"' in js diff --git a/l4d2web/tests/test_job_worker.py b/l4d2web/tests/test_job_worker.py index f4f8b67..49df447 100644 --- a/l4d2web/tests/test_job_worker.py +++ b/l4d2web/tests/test_job_worker.py @@ -1,8 +1,16 @@ from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from types import SimpleNamespace +import subprocess import pytest +from sqlalchemy import select -from l4d2web.services.job_worker import SchedulerState, can_start, recover_stale_jobs +from l4d2web.auth import hash_password +from l4d2web.db import init_db, session_scope +from l4d2web.models import Blueprint, Job, Server, User +from l4d2web.services import l4d2_facade +from l4d2web.services.job_worker import SchedulerState, can_start, recover_stale_jobs, run_worker_once @dataclass @@ -12,54 +20,303 @@ class DummyJob: @pytest.fixture -def worker_fixture(tmp_path, monkeypatch): +def worker_app(tmp_path, monkeypatch): from l4d2web.app import create_app - from l4d2web.db import init_db db_url = f"sqlite:///{tmp_path/'worker.db'}" monkeypatch.setenv("DATABASE_URL", db_url) app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) init_db() - - class WorkerFixture: - def run_once(self): - state = SchedulerState() - - state.running_servers.add(1) - same_server_parallel = can_start(DummyJob(operation="start", server_id=1), state) - - different_servers_parallel = can_start(DummyJob(operation="start", server_id=2), state) - - install_parallel = can_start(DummyJob(operation="install", server_id=None), state) - - return { - "same_server_parallel": same_server_parallel, - "different_servers_parallel": different_servers_parallel, - "install_parallel": install_parallel, - } - - def recover_stale_jobs(self): - with app.app_context(): - return recover_stale_jobs() - - return WorkerFixture() + return app -def test_same_server_jobs_serialized(worker_fixture) -> None: - result = worker_fixture.run_once() - assert result["same_server_parallel"] is False +@pytest.fixture +def seeded_worker(worker_app): + with session_scope() as session: + user = User(username="alice", password_digest=hash_password("secret"), admin=False) + session.add(user) + session.flush() + + blueprint = Blueprint(user_id=user.id, name="default", arguments="[]", config="[]") + session.add(blueprint) + session.flush() + + server_one = Server( + user_id=user.id, + blueprint_id=blueprint.id, + name="alpha", + port=27015, + last_error="old error", + ) + server_two = Server(user_id=user.id, blueprint_id=blueprint.id, name="bravo", port=27016) + session.add_all([server_one, server_two]) + session.flush() + + ids = SimpleNamespace(user=user.id, server_one=server_one.id, server_two=server_two.id) + + return worker_app, ids -def test_different_servers_can_run_parallel(worker_fixture) -> None: - result = worker_fixture.run_once() - assert result["different_servers_parallel"] is True +def add_job( + user_id: int, + operation: str, + *, + server_id: int | None, + state: str = "queued", + created_at: datetime | None = None, +) -> int: + now = datetime.now(UTC) + with session_scope() as session: + job = Job( + user_id=user_id, + server_id=server_id, + operation=operation, + state=state, + created_at=created_at or now, + updated_at=created_at or now, + ) + if state == "running": + job.started_at = now + session.add(job) + session.flush() + return job.id -def test_install_global_exclusive(worker_fixture) -> None: - result = worker_fixture.run_once() - assert result["install_parallel"] is False +def load_job(job_id: int) -> Job: + with session_scope() as session: + job = session.scalar(select(Job).where(Job.id == job_id)) + assert job is not None + return job -def test_recover_stale_running_jobs(worker_fixture) -> None: - recovered = worker_fixture.recover_stale_jobs() - assert recovered >= 0 +def test_scheduler_predicates() -> None: + state = SchedulerState() + state.running_servers.add(1) + + assert can_start(DummyJob(operation="start", server_id=1), state) is False + assert can_start(DummyJob(operation="start", server_id=2), state) is True + assert can_start(DummyJob(operation="install", server_id=None), state) is False + + +def test_run_worker_once_claims_oldest_runnable_job(seeded_worker, monkeypatch) -> None: + app, ids = seeded_worker + calls = [] + older = datetime.now(UTC) - timedelta(minutes=2) + newer = datetime.now(UTC) + old_job_id = add_job(ids.user, "initialize", server_id=ids.server_one, created_at=older) + new_job_id = add_job(ids.user, "initialize", server_id=ids.server_two, created_at=newer) + + monkeypatch.setattr(l4d2_facade, "initialize_server", lambda server_id, **kwargs: calls.append(server_id)) + monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped")) + + with app.app_context(): + assert run_worker_once() is True + + assert calls == [ids.server_one] + assert load_job(old_job_id).state == "succeeded" + assert load_job(new_job_id).state == "queued" + + +def test_successful_start_job_logs_and_refreshes_server_state(seeded_worker, monkeypatch) -> None: + app, ids = seeded_worker + job_id = add_job(ids.user, "start", server_id=ids.server_one) + calls = [] + + def fake_initialize(server_id, *, on_stdout=None, on_stderr=None): + calls.append(("initialize", server_id)) + on_stdout("initialized") + on_stderr("init warning") + + def fake_start(server_id, *, on_stdout=None, on_stderr=None): + calls.append(("start", server_id)) + on_stdout("started") + + monkeypatch.setattr(l4d2_facade, "initialize_server", fake_initialize) + monkeypatch.setattr(l4d2_facade, "start_server", fake_start) + monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="running")) + + with app.app_context(): + assert run_worker_once() is True + + with session_scope() as session: + job = session.scalar(select(Job).where(Job.id == job_id)) + server = session.scalar(select(Server).where(Server.id == ids.server_one)) + lines = [(row.seq, row.stream, row.line) for row in job_logs_for(session, job_id)] + + assert calls == [("initialize", ids.server_one), ("start", ids.server_one)] + assert job is not None + assert job.state == "succeeded" + assert job.exit_code == 0 + assert job.started_at is not None + assert job.finished_at is not None + assert job.updated_at is not None + assert lines == [ + (1, "stdout", "initialized"), + (2, "stderr", "init warning"), + (3, "stdout", "started"), + ] + assert server is not None + assert server.actual_state == "running" + assert server.last_error == "" + + +def job_logs_for(session, job_id: int): + from l4d2web.models import JobLog + + return session.scalars(select(JobLog).where(JobLog.job_id == job_id).order_by(JobLog.seq)).all() + + +def test_called_process_error_fails_job_and_sets_server_error(seeded_worker, monkeypatch) -> None: + app, ids = seeded_worker + job_id = add_job(ids.user, "stop", server_id=ids.server_one) + + def fail_stop(server_id, **kwargs): + raise subprocess.CalledProcessError(returncode=7, cmd=["stop"], stderr="stop failed") + + monkeypatch.setattr(l4d2_facade, "stop_server", fail_stop) + monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped")) + + with app.app_context(): + assert run_worker_once() is True + + with session_scope() as session: + job = session.scalar(select(Job).where(Job.id == job_id)) + server = session.scalar(select(Server).where(Server.id == ids.server_one)) + lines = [row.line for row in job_logs_for(session, job_id)] + + assert job is not None + assert job.state == "failed" + assert job.exit_code == 7 + assert server is not None + assert server.last_error == "stop failed" + assert "stop failed" in lines + + +def test_refresh_failure_does_not_hide_operation_failure(seeded_worker, monkeypatch) -> None: + app, ids = seeded_worker + job_id = add_job(ids.user, "stop", server_id=ids.server_one) + + def fail_stop(server_id, **kwargs): + raise subprocess.CalledProcessError(returncode=7, cmd=["stop"], stderr="stop failed") + + monkeypatch.setattr(l4d2_facade, "stop_server", fail_stop) + monkeypatch.setattr(l4d2_facade, "server_status", lambda name: (_ for _ in ()).throw(RuntimeError("status down"))) + + with app.app_context(): + assert run_worker_once() is True + + with session_scope() as session: + job = session.scalar(select(Job).where(Job.id == job_id)) + server = session.scalar(select(Server).where(Server.id == ids.server_one)) + lines = [row.line for row in job_logs_for(session, job_id)] + + assert job is not None + assert job.state == "failed" + assert job.exit_code == 7 + assert server is not None + assert server.last_error == "stop failed" + assert "status refresh failed: status down" in lines + + +def test_unexpected_exception_fails_job_with_exit_code_one(seeded_worker, monkeypatch) -> None: + app, ids = seeded_worker + job_id = add_job(ids.user, "delete", server_id=ids.server_one) + + monkeypatch.setattr(l4d2_facade, "delete_server", lambda server_id, **kwargs: (_ for _ in ()).throw(RuntimeError("boom"))) + monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="unknown")) + + with app.app_context(): + assert run_worker_once() is True + + job = load_job(job_id) + assert job.state == "failed" + assert job.exit_code == 1 + + +def test_same_server_jobs_do_not_overlap(seeded_worker, monkeypatch) -> None: + app, ids = seeded_worker + add_job(ids.user, "start", server_id=ids.server_one, state="running") + queued_id = add_job(ids.user, "stop", server_id=ids.server_one) + monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: pytest.fail("must not run")) + + with app.app_context(): + assert run_worker_once() is False + + assert load_job(queued_id).state == "queued" + + +def test_different_server_jobs_can_be_claimed_while_other_server_runs(seeded_worker, monkeypatch) -> None: + app, ids = seeded_worker + add_job(ids.user, "start", server_id=ids.server_one, state="running") + queued_id = add_job(ids.user, "stop", server_id=ids.server_two) + monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: None) + monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped")) + + with app.app_context(): + assert run_worker_once() is True + + assert load_job(queued_id).state == "succeeded" + + +def test_install_job_not_claimed_while_server_job_runs(seeded_worker) -> None: + app, ids = seeded_worker + add_job(ids.user, "start", server_id=ids.server_one, state="running") + install_id = add_job(ids.user, "install", server_id=None) + + with app.app_context(): + assert run_worker_once() is False + + assert load_job(install_id).state == "queued" + + +def test_server_job_not_claimed_while_install_runs(seeded_worker) -> None: + app, ids = seeded_worker + add_job(ids.user, "install", server_id=None, state="running") + queued_id = add_job(ids.user, "initialize", server_id=ids.server_one) + + with app.app_context(): + assert run_worker_once() is False + + assert load_job(queued_id).state == "queued" + + +def test_recover_stale_running_jobs(worker_app) -> None: + with session_scope() as session: + user = User(username="bob", password_digest=hash_password("secret"), admin=False) + session.add(user) + session.flush() + job = Job(user_id=user.id, server_id=None, operation="install", state="running") + session.add(job) + session.flush() + job_id = job.id + + with worker_app.app_context(): + assert recover_stale_jobs() == 1 + + recovered = load_job(job_id) + assert recovered.state == "failed" + assert recovered.finished_at is not None + + +def test_worker_startup_skipped_during_testing(monkeypatch, tmp_path) -> None: + from l4d2web import app as app_module + + called = [] + db_url = f"sqlite:///{tmp_path/'startup.db'}" + monkeypatch.setattr(app_module, "start_job_workers", lambda app: called.append(app)) + + app_module.create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) + + assert called == [] + + +def test_worker_startup_when_enabled_outside_testing(monkeypatch, tmp_path) -> None: + from l4d2web import app as app_module + + called = [] + db_url = f"sqlite:///{tmp_path/'startup-enabled.db'}" + monkeypatch.setattr(app_module, "start_job_workers", lambda app: called.append(app)) + + app = app_module.create_app({"TESTING": False, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) + + assert called == [app] diff --git a/l4d2web/tests/test_pages.py b/l4d2web/tests/test_pages.py index 68025d4..3f6b41a 100644 --- a/l4d2web/tests/test_pages.py +++ b/l4d2web/tests/test_pages.py @@ -4,7 +4,7 @@ from pathlib import Path from l4d2web.app import create_app from l4d2web.auth import hash_password from l4d2web.db import init_db, session_scope -from l4d2web.models import Blueprint, BlueprintOverlay, Overlay, Server, User +from l4d2web.models import Blueprint, BlueprintOverlay, Job, Overlay, Server, User @pytest.fixture @@ -161,16 +161,50 @@ def test_admin_can_use_admin_pages(tmp_path, monkeypatch) -> None: with client.session_transaction() as sess: sess["user_id"] = admin_id - assert client.get("/admin").status_code == 200 + admin_page = client.get("/admin") + assert admin_page.status_code == 200 + assert 'action="/admin/install"' in admin_page.get_data(as_text=True) assert client.get("/admin/users").status_code == 200 assert client.get("/admin/jobs").status_code == 200 assert 'href="/admin"' in client.get("/dashboard").get_data(as_text=True) +def test_admin_can_enqueue_runtime_install_job(tmp_path, monkeypatch) -> None: + db_url = f"sqlite:///{tmp_path/'admin-install.db'}" + monkeypatch.setenv("DATABASE_URL", db_url) + app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) + init_db() + + with session_scope() as session: + admin = User(username="admin", password_digest=hash_password("secret"), admin=True) + session.add(admin) + session.flush() + admin_id = admin.id + + client = app.test_client() + with client.session_transaction() as sess: + sess["user_id"] = admin_id + sess["csrf_token"] = "test-token" + + response = client.post("/admin/install", headers={"X-CSRF-Token": "test-token"}) + + assert response.status_code == 302 + assert response.headers["Location"].endswith("/admin/jobs") + with session_scope() as session: + job = session.query(Job).one() + assert job.user_id == admin_id + assert job.server_id is None + assert job.operation == "install" + assert job.state == "queued" + + def test_non_admin_cannot_open_admin_pages(auth_client_with_server) -> None: assert auth_client_with_server.get("/admin").status_code == 403 assert auth_client_with_server.get("/admin/users").status_code == 403 assert auth_client_with_server.get("/admin/jobs").status_code == 403 + with auth_client_with_server.session_transaction() as sess: + sess["csrf_token"] = "test-token" + assert auth_client_with_server.post("/admin/install", headers={"X-CSRF-Token": "test-token"}).status_code == 403 def test_anonymous_protected_page_redirects_to_login_with_next(tmp_path, monkeypatch) -> None: