feat(l4d2-web): execute queued lifecycle jobs

This commit is contained in:
mwiegand 2026-05-06 14:08:18 +02:00
parent df680f6226
commit 91d042cf33
No known key found for this signature in database
11 changed files with 891 additions and 62 deletions

View file

@ -0,0 +1,278 @@
# L4D2 Web Queue Worker Implementation Plan
> **Approval gate:** This plan may be written and refined without further approval. Do not implement code changes from this plan until the user explicitly approves implementation.
**Goal:** Complete the `l4d2web` async lifecycle queue so queued jobs are claimed, executed through the direct `l4d2host` Python APIs, logged to `job_logs`, reflected in server state, and streamed live to the UI.
**Architecture:** Keep the v1 single-process Flask architecture. Use DB-backed queued jobs as the durable source of truth, worker threads inside the Flask process, SQLite-safe process-local locks, and direct imports through `l4d2web.services.l4d2_facade`. Do not shell out to `l4d2ctl` from the web app.
---
## Current Gap
- Server lifecycle routes create `Job(state="queued")` rows.
- `l4d2web.services.job_worker` has scheduler helpers, stale recovery, command-log append, and actual-state refresh helpers.
- No worker claims queued jobs.
- No code dispatches queued operations to `l4d2_facade`.
- No command callbacks persist live stdout/stderr while jobs run.
- Job-log SSE currently replays existing rows once and does not live-follow new rows.
- Job-log SSE emits `stdout`/`stderr` custom events, while `static/js/sse.js` only handles default messages.
- No web route currently enqueues global `install` jobs.
---
## Locked Decisions
- Queue execution uses direct Python imports through `l4d2web.services.l4d2_facade`.
- The queue is DB-backed, not an in-memory `queue.Queue`.
- Worker threads are in-process daemon threads.
- SQLite concurrency is protected with process-local locks; no distributed lock manager is added.
- Workers are not started during normal tests.
- `POST /admin/install` is added as the admin-only runtime install/update entry point.
- `install` jobs have `server_id=None` and are globally exclusive.
- Server-specific jobs do not overlap on the same `server_id`.
- Different server jobs can run concurrently when no install job is running.
- A web `start` job applies the live-linked blueprint before start by running `initialize_server(server_id)` and then `start_server(server_id)`. This satisfies “blueprint updates apply on next action.”
- `delete` removes the host instance/runtime through `l4d2host`; it does not delete the web `Server` row in v1.
- Command log rows are retained indefinitely.
---
## Task 1: Extend Worker Tests First
**Files:**
- Modify: `l4d2web/tests/test_job_worker.py`
- Modify as needed: `l4d2web/tests/test_job_logs.py`
Add tests that verify the worker behavior without touching real systemd, Steam, or `/opt/l4d2`. Use monkeypatched `l4d2web.services.l4d2_facade` functions.
Required coverage:
- `run_worker_once()` claims the oldest runnable queued job.
- A successful server job transitions `queued -> running -> succeeded` and sets `exit_code=0`, `started_at`, `finished_at`, and `updated_at`.
- A successful job persists stdout/stderr callback lines in `job_logs`.
- A `subprocess.CalledProcessError` transitions the job to `failed` and stores `exit_code=exc.returncode`.
- An unexpected exception transitions the job to `failed` with `exit_code=1`.
- Same-server jobs do not overlap.
- Different-server jobs can be claimed concurrently by separate worker passes.
- An `install` job is not claimed while any server job is running.
- Server jobs are not claimed while an `install` job is running.
- Startup recovery marks stale `running` jobs as `failed`.
- Actual server state is refreshed after server-specific lifecycle jobs.
- `Server.last_error` is cleared on success and set on failure.
Verification command:
```bash
pytest l4d2web/tests/test_job_worker.py -q
```
Expected before implementation: FAIL.
---
## Task 2: Implement Queue Claiming And Job Execution
**Files:**
- Modify: `l4d2web/services/job_worker.py`
Add worker-core functions:
- `build_scheduler_state(session) -> SchedulerState`
- `claim_next_job() -> int | None`
- `run_worker_once() -> bool`
- `run_job(job_id: int) -> None`
- `finish_job(job_id: int, state: str, exit_code: int | None, error: str = "") -> None`
- `append_job_log_line(job_id: int, stream: str, line: str, max_chars: int = 4096) -> int`
Implementation rules:
- Use a module-level claim lock around scheduler-state construction, queued-job selection, and `queued -> running` transition.
- Commit the `running` transition before executing any host operation.
- Do not keep a DB session open while a host operation runs.
- Use a module-level log lock around `append_job_log()` so concurrent stdout/stderr callback threads cannot duplicate `seq` values.
- Recompute scheduler state from `running` jobs in the DB, not from only in-memory state.
- Select queued jobs by `created_at`, then `id` for deterministic order.
- Skip malformed server operations with no `server_id` by failing the job cleanly.
- Treat unknown operations as failed jobs, not worker-thread crashes.
Operation dispatch:
```text
install -> l4d2_facade.install_runtime(...)
initialize -> l4d2_facade.initialize_server(server_id, ...)
start -> l4d2_facade.initialize_server(server_id, ...), then l4d2_facade.start_server(server_id, ...)
stop -> l4d2_facade.stop_server(server_id, ...)
delete -> l4d2_facade.delete_server(server_id, ...)
```
Failure handling:
- `subprocess.CalledProcessError`: append remaining stderr if useful, fail with `exit_code=returncode`.
- Any other exception: append exception text to stderr, fail with `exit_code=1`.
- Never let a job exception kill the worker loop.
Verification command:
```bash
pytest l4d2web/tests/test_job_worker.py -q
```
Expected after implementation: PASS.
---
## Task 3: Add Worker Thread Startup
**Files:**
- Modify: `l4d2web/config.py`
- Modify: `l4d2web/app.py`
- Modify: `l4d2web/services/job_worker.py`
- Modify: `l4d2web/tests/test_job_worker.py`
Add config:
```python
"JOB_WORKER_ENABLED": True
"JOB_WORKER_POLL_SECONDS": 1
```
Add worker lifecycle functions:
- `start_job_workers(app) -> None`
- `worker_loop(app, poll_seconds: float) -> None`
Startup behavior:
- `create_app()` still calls `recover_stale_jobs()`.
- After recovery, `create_app()` starts workers only when enabled and not in `TESTING`.
- Guard against duplicate worker startup in the same process.
- Worker threads run as daemon threads.
- Each worker loop uses `app.app_context()` around `run_worker_once()`.
- If no job was run, sleep for `JOB_WORKER_POLL_SECONDS`.
Testing requirements:
- Tests should not accidentally start real background workers.
- Add a focused startup test with monkeypatched `start_job_workers` if needed.
Verification command:
```bash
pytest l4d2web/tests/test_job_worker.py -q
```
---
## Task 4: Make Job Log SSE Live-Follow
**Files:**
- Modify: `l4d2web/routes/job_routes.py`
- Modify: `l4d2web/static/js/sse.js`
- Modify: `l4d2web/tests/test_job_logs.py`
Route behavior:
- Authorize the job before streaming.
- Replay rows with `seq > last_seq` up to `JOB_LOG_REPLAY_LIMIT`.
- Continue polling for new rows while the job is not terminal.
- Close the stream after all available logs are sent and the job state is terminal.
- Keep emitting `id: <seq>` so EventSource can resume.
- Keep `event: stdout` and `event: stderr` for job logs.
JS behavior:
- Keep handling default server-log messages via `source.onmessage`.
- Also register `stdout` and `stderr` listeners that append job-log lines to the same element.
- Prefix custom job-log events with the stream name only if useful for readability.
Terminal states:
```text
succeeded
failed
cancelled
```
`cancelled` is reserved for future use and does not require cancellation support in this task.
Verification command:
```bash
pytest l4d2web/tests/test_job_logs.py -q
```
---
## Task 5: Add Admin Runtime Install Action
**Files:**
- Modify: `l4d2web/routes/page_routes.py`
- Modify: `l4d2web/templates/admin.html`
- Modify: `l4d2web/tests/test_pages.py` or add a focused admin route test
Behavior:
- `POST /admin/install` requires `@require_admin`.
- Creates `Job(user_id=current_admin.id, server_id=None, operation="install", state="queued")`.
- Redirects to `/admin/jobs`.
- Non-admin logged-in users receive `403`.
- Anonymous users are redirected to login.
- Admin page shows a CSRF-protected form/button for runtime install/update.
Verification command:
```bash
pytest l4d2web/tests/test_pages.py -q
```
---
## Task 6: Full Verification And Review
Run focused suites first:
```bash
pytest l4d2web/tests/test_job_worker.py -q
pytest l4d2web/tests/test_job_logs.py -q
pytest l4d2web/tests/test_pages.py -q
```
Then run the full web suite:
```bash
pytest l4d2web/tests -q
```
Refresh the code index after implementation:
```bash
ccc index
```
Request a final read-only review focused on:
- queue claiming races
- duplicate worker startup
- job-log sequence ordering
- error handling and `last_error`
- live SSE behavior
- `start` applying blueprint updates before host start
---
## Commit Strategy
Use small commits after passing relevant tests:
1. `feat(l4d2-web): execute queued lifecycle jobs`
2. `feat(l4d2-web): live-follow queued job logs`
3. `feat(l4d2-web): add admin runtime install job`
Do not commit unless the user explicitly asks for commits.
---
## Open Approval Gate
Before modifying implementation files, ask the user for explicit approval to proceed with the queue-worker implementation.

View file

@ -15,7 +15,7 @@ from l4d2web.routes.log_routes import bp as log_bp
from l4d2web.routes.overlay_routes import bp as overlay_bp from l4d2web.routes.overlay_routes import bp as overlay_bp
from l4d2web.routes.page_routes import bp as page_bp from l4d2web.routes.page_routes import bp as page_bp
from l4d2web.routes.server_routes import bp as server_bp from l4d2web.routes.server_routes import bp as server_bp
from l4d2web.services.job_worker import recover_stale_jobs from l4d2web.services.job_worker import recover_stale_jobs, start_job_workers
def create_app(test_config: dict[str, object] | None = None) -> Flask: def create_app(test_config: dict[str, object] | None = None) -> Flask:
@ -63,6 +63,8 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
if app.config.get("TESTING"): if app.config.get("TESTING"):
reset_login_rate_limits() reset_login_rate_limits()
recover_stale_jobs() recover_stale_jobs()
if app.config.get("JOB_WORKER_ENABLED") and not app.config.get("TESTING"):
start_job_workers(app)
@app.get("/health") @app.get("/health")
def health(): def health():

View file

@ -3,6 +3,8 @@ DEFAULT_CONFIG: dict[str, object] = {
"DATABASE_URL": "sqlite:///l4d2web.db", "DATABASE_URL": "sqlite:///l4d2web.db",
"STATUS_REFRESH_SECONDS": 8, "STATUS_REFRESH_SECONDS": 8,
"JOB_WORKER_THREADS": 4, "JOB_WORKER_THREADS": 4,
"JOB_WORKER_ENABLED": True,
"JOB_WORKER_POLL_SECONDS": 1,
"JOB_LOG_REPLAY_LIMIT": 2000, "JOB_LOG_REPLAY_LIMIT": 2000,
"JOB_LOG_LINE_MAX_CHARS": 4096, "JOB_LOG_LINE_MAX_CHARS": 4096,
} }

View file

@ -1,3 +1,5 @@
import time
from flask import Blueprint, Response, current_app, request from flask import Blueprint, Response, current_app, request
from sqlalchemy import select from sqlalchemy import select
@ -7,6 +9,14 @@ from l4d2web.models import Job, JobLog
bp = Blueprint("job", __name__) bp = Blueprint("job", __name__)
TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"}
def format_sse_event(seq: int, event: str, data: str) -> str:
lines = [f"id: {seq}", f"event: {event}"]
for line in data.splitlines() or [""]:
lines.append(f"data: {line}")
return "\n".join(lines) + "\n\n"
@bp.get("/jobs/<int:job_id>/stream") @bp.get("/jobs/<int:job_id>/stream")
@ -15,8 +25,9 @@ def stream_job(job_id: int) -> Response:
user = current_user() user = current_user()
assert user is not None assert user is not None
last_seq = int(request.args.get("last_seq", "0")) last_seq = int(request.args.get("last_seq") or request.headers.get("Last-Event-ID") or "0")
limit = int(current_app.config["JOB_LOG_REPLAY_LIMIT"]) limit = int(current_app.config["JOB_LOG_REPLAY_LIMIT"])
poll_seconds = float(current_app.config.get("JOB_WORKER_POLL_SECONDS", 1))
with session_scope() as db: with session_scope() as db:
job = db.scalar(select(Job).where(Job.id == job_id, Job.user_id == user.id)) job = db.scalar(select(Job).where(Job.id == job_id, Job.user_id == user.id))
@ -24,16 +35,26 @@ def stream_job(job_id: int) -> Response:
return Response(status=404) return Response(status=404)
def generate(): def generate():
next_seq = last_seq
while True:
with session_scope() as db: with session_scope() as db:
job = db.scalar(select(Job).where(Job.id == job_id))
if job is None:
return
rows = db.scalars( rows = db.scalars(
select(JobLog) select(JobLog)
.where(JobLog.job_id == job_id, JobLog.seq > last_seq) .where(JobLog.job_id == job_id, JobLog.seq > next_seq)
.order_by(JobLog.seq) .order_by(JobLog.seq)
.limit(limit) .limit(limit)
).all() ).all()
terminal = job.state in TERMINAL_JOB_STATES
for row in rows: for row in rows:
yield f"id: {row.seq}\n" next_seq = row.seq
yield f"event: {row.stream}\n" yield format_sse_event(row.seq, row.stream, row.line)
yield f"data: {row.line}\n\n"
if terminal and len(rows) < limit:
return
time.sleep(poll_seconds)
return Response(generate(), mimetype="text/event-stream") return Response(generate(), mimetype="text/event-stream")

View file

@ -1,6 +1,6 @@
import json import json
from flask import Blueprint, Response, current_app, render_template from flask import Blueprint, Response, redirect, render_template
from sqlalchemy import select from sqlalchemy import select
from l4d2web.auth import current_user, require_admin, require_login from l4d2web.auth import current_user, require_admin, require_login
@ -24,6 +24,16 @@ def admin_home() -> str:
return render_template("admin.html") return render_template("admin.html")
@bp.post("/admin/install")
@require_admin
def enqueue_runtime_install() -> Response:
user = current_user()
assert user is not None
with session_scope() as db:
db.add(Job(user_id=user.id, server_id=None, operation="install", state="queued"))
return redirect("/admin/jobs")
@bp.get("/admin/users") @bp.get("/admin/users")
@require_admin @require_admin
def admin_users() -> str: def admin_users() -> str:
@ -151,4 +161,3 @@ def blueprint_page(blueprint_id: int):
arguments=json.loads(blueprint.arguments), arguments=json.loads(blueprint.arguments),
config_lines=json.loads(blueprint.config), config_lines=json.loads(blueprint.config),
) )

View file

@ -1,5 +1,8 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import UTC, datetime from datetime import UTC, datetime
import subprocess
import threading
import time
from sqlalchemy import func, select from sqlalchemy import func, select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@ -8,6 +11,15 @@ from l4d2web.db import session_scope
from l4d2web.models import Job, JobLog, Server from l4d2web.models import Job, JobLog, Server
TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"}
SERVER_OPERATIONS = {"initialize", "start", "stop", "delete"}
_claim_lock = threading.Lock()
_log_lock = threading.RLock()
_worker_start_lock = threading.Lock()
_workers_started = False
@dataclass @dataclass
class SchedulerState: class SchedulerState:
install_running: bool = False install_running: bool = False
@ -24,12 +36,128 @@ def can_start(job, state: SchedulerState) -> bool:
return job.server_id not in state.running_servers return job.server_id not in state.running_servers
def build_scheduler_state(session: Session) -> SchedulerState:
state = SchedulerState()
running_jobs = session.scalars(select(Job).where(Job.state == "running")).all()
for job in running_jobs:
if job.operation == "install":
state.install_running = True
elif job.server_id is not None:
state.running_servers.add(job.server_id)
return state
def claim_next_job() -> int | None:
with _claim_lock:
with session_scope() as db:
state = build_scheduler_state(db)
jobs = db.scalars(select(Job).where(Job.state == "queued").order_by(Job.created_at, Job.id)).all()
now = datetime.now(UTC)
for job in jobs:
malformed_server_job = job.operation != "install" and job.server_id is None
if not malformed_server_job and not can_start(job, state):
continue
job.state = "running"
job.started_at = now
job.updated_at = now
job.exit_code = None
db.flush()
return job.id
return None
def run_worker_once() -> bool:
job_id = claim_next_job()
if job_id is None:
return False
run_job(job_id)
return True
def run_job(job_id: int) -> None:
from l4d2web.services import l4d2_facade
with session_scope() as db:
job = db.scalar(select(Job).where(Job.id == job_id))
if job is None:
return
operation = job.operation
server_id = job.server_id
max_chars = 4096
def on_stdout(line: str) -> None:
append_job_log_line(job_id, "stdout", line, max_chars=max_chars)
def on_stderr(line: str) -> None:
append_job_log_line(job_id, "stderr", line, max_chars=max_chars)
try:
if operation == "install":
l4d2_facade.install_runtime(on_stdout=on_stdout, on_stderr=on_stderr)
elif operation in SERVER_OPERATIONS and server_id is None:
raise ValueError(f"{operation} job has no server_id")
elif operation == "initialize":
l4d2_facade.initialize_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr)
elif operation == "start":
l4d2_facade.initialize_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr)
l4d2_facade.start_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr)
elif operation == "stop":
l4d2_facade.stop_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr)
elif operation == "delete":
l4d2_facade.delete_server(server_id, on_stdout=on_stdout, on_stderr=on_stderr)
else:
raise ValueError(f"unknown job operation: {operation}")
if server_id is not None:
refresh_server_actual_state_after_job(job_id, server_id)
finish_job(job_id, "succeeded", 0)
except subprocess.CalledProcessError as exc:
error = exc.stderr or str(exc)
if exc.stderr:
append_job_log_line(job_id, "stderr", str(exc.stderr), max_chars=max_chars)
if server_id is not None:
refresh_server_actual_state_after_job(job_id, server_id)
finish_job(job_id, "failed", exc.returncode, error=error)
except Exception as exc:
error = str(exc)
append_job_log_line(job_id, "stderr", error, max_chars=max_chars)
if server_id is not None:
refresh_server_actual_state_after_job(job_id, server_id)
finish_job(job_id, "failed", 1, error=error)
def finish_job(job_id: int, state: str, exit_code: int | None, error: str = "") -> None:
now = datetime.now(UTC)
with session_scope() as db:
job = db.scalar(select(Job).where(Job.id == job_id))
if job is None:
return
job.state = state
job.exit_code = exit_code
job.finished_at = now
job.updated_at = now
if job.server_id is not None:
server = db.scalar(select(Server).where(Server.id == job.server_id))
if server is not None:
server.last_error = "" if state == "succeeded" else error
server.updated_at = now
def append_job_log_line(job_id: int, stream: str, line: str, max_chars: int = 4096) -> int:
with _log_lock:
with session_scope() as db:
return append_job_log(db, job_id, stream, line, max_chars=max_chars)
def recover_stale_jobs() -> int: def recover_stale_jobs() -> int:
now = datetime.now(UTC) now = datetime.now(UTC)
with session_scope() as db: with session_scope() as db:
jobs = db.scalars(select(Job).where(Job.state == "running")).all() jobs = db.scalars(select(Job).where(Job.state == "running")).all()
for job in jobs: for job in jobs:
job.state = "failed" job.state = "failed"
job.exit_code = 1
job.finished_at = now job.finished_at = now
job.updated_at = now job.updated_at = now
return len(jobs) return len(jobs)
@ -42,6 +170,7 @@ def append_job_log(
line: str, line: str,
max_chars: int = 4096, max_chars: int = 4096,
) -> int: ) -> int:
with _log_lock:
last_seq = session.scalar(select(func.max(JobLog.seq)).where(JobLog.job_id == job_id)) or 0 last_seq = session.scalar(select(func.max(JobLog.seq)).where(JobLog.job_id == job_id)) or 0
next_seq = int(last_seq) + 1 next_seq = int(last_seq) + 1
session.add(JobLog(job_id=job_id, seq=next_seq, stream=stream, line=line[:max_chars])) session.add(JobLog(job_id=job_id, seq=next_seq, stream=stream, line=line[:max_chars]))
@ -62,3 +191,41 @@ def refresh_server_actual_state(server_id: int) -> str:
server.actual_state_updated_at = now server.actual_state_updated_at = now
server.updated_at = now server.updated_at = now
return server.actual_state return server.actual_state
def refresh_server_actual_state_after_job(job_id: int, server_id: int) -> None:
try:
refresh_server_actual_state(server_id)
except Exception as exc:
append_job_log_line(job_id, "stderr", f"status refresh failed: {exc}")
def start_job_workers(app) -> None:
global _workers_started
with _worker_start_lock:
if _workers_started:
return
_workers_started = True
threads = int(app.config.get("JOB_WORKER_THREADS", 4))
poll_seconds = float(app.config.get("JOB_WORKER_POLL_SECONDS", 1))
for index in range(threads):
thread = threading.Thread(
target=worker_loop,
args=(app, poll_seconds),
name=f"l4d2-job-worker-{index + 1}",
daemon=True,
)
thread.start()
def worker_loop(app, poll_seconds: float) -> None:
while True:
ran_job = False
try:
with app.app_context():
ran_job = run_worker_once()
except Exception:
ran_job = False
if not ran_job:
time.sleep(poll_seconds)

View file

@ -5,10 +5,23 @@ function streamTextToElement(element) {
} }
const source = new EventSource(url); const source = new EventSource(url);
source.onmessage = (event) => {
element.textContent += `${event.data}\n`; const appendLine = (line) => {
element.textContent += `${line}\n`;
element.scrollTop = element.scrollHeight; element.scrollTop = element.scrollHeight;
}; };
source.onmessage = (event) => {
appendLine(event.data);
};
source.addEventListener("stdout", (event) => {
appendLine(event.data);
});
source.addEventListener("stderr", (event) => {
appendLine(`[stderr] ${event.data}`);
});
} }
document.addEventListener("DOMContentLoaded", () => { document.addEventListener("DOMContentLoaded", () => {

View file

@ -10,4 +10,13 @@
<li><a href="/admin/jobs">Jobs</a></li> <li><a href="/admin/jobs">Jobs</a></li>
</ul> </ul>
</section> </section>
<section class="panel">
<h2>Runtime</h2>
<p class="muted">Queue a Steam runtime install/update job for the local host.</p>
<form method="post" action="/admin/install">
<input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}">
<button type="submit">Install or update runtime</button>
</form>
</section>
{% endblock %} {% endblock %}

View file

@ -1,3 +1,5 @@
from pathlib import Path
from sqlalchemy import text from sqlalchemy import text
import pytest import pytest
@ -21,7 +23,7 @@ def seeded_job_logs(tmp_path, monkeypatch):
session.add(user) session.add(user)
session.flush() session.flush()
job = Job(user_id=user.id, server_id=None, operation="install", state="queued") job = Job(user_id=user.id, server_id=None, operation="install", state="succeeded")
session.add(job) session.add(job)
session.flush() session.flush()
@ -67,3 +69,38 @@ def test_sse_resume_from_last_seq(seeded_job_logs) -> None:
response = client.get(f"/jobs/{job_id}/stream?last_seq=5") response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
assert response.status_code == 200 assert response.status_code == 200
def test_sse_replays_custom_job_log_events(seeded_job_logs) -> None:
app, job_id, user_id = seeded_job_logs
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
text = response.get_data(as_text=True)
assert "id: 6\n" in text
assert "event: stdout\n" in text
assert "data: line-6\n\n" in text
assert "data: line-5\n\n" not in text
def test_sse_resumes_from_last_event_id_header(seeded_job_logs) -> None:
app, job_id, user_id = seeded_job_logs
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
response = client.get(f"/jobs/{job_id}/stream", headers={"Last-Event-ID": "6"})
text = response.get_data(as_text=True)
assert "data: line-7\n\n" in text
assert "data: line-6\n\n" not in text
def test_sse_js_handles_job_log_custom_events() -> None:
js = Path("l4d2web/static/js/sse.js").read_text()
assert 'addEventListener("stdout"' in js
assert 'addEventListener("stderr"' in js

View file

@ -1,8 +1,16 @@
from dataclasses import dataclass from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from types import SimpleNamespace
import subprocess
import pytest import pytest
from sqlalchemy import select
from l4d2web.services.job_worker import SchedulerState, can_start, recover_stale_jobs from l4d2web.auth import hash_password
from l4d2web.db import init_db, session_scope
from l4d2web.models import Blueprint, Job, Server, User
from l4d2web.services import l4d2_facade
from l4d2web.services.job_worker import SchedulerState, can_start, recover_stale_jobs, run_worker_once
@dataclass @dataclass
@ -12,54 +20,303 @@ class DummyJob:
@pytest.fixture @pytest.fixture
def worker_fixture(tmp_path, monkeypatch): def worker_app(tmp_path, monkeypatch):
from l4d2web.app import create_app from l4d2web.app import create_app
from l4d2web.db import init_db
db_url = f"sqlite:///{tmp_path/'worker.db'}" db_url = f"sqlite:///{tmp_path/'worker.db'}"
monkeypatch.setenv("DATABASE_URL", db_url) monkeypatch.setenv("DATABASE_URL", db_url)
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db() init_db()
return app
class WorkerFixture:
def run_once(self): @pytest.fixture
def seeded_worker(worker_app):
with session_scope() as session:
user = User(username="alice", password_digest=hash_password("secret"), admin=False)
session.add(user)
session.flush()
blueprint = Blueprint(user_id=user.id, name="default", arguments="[]", config="[]")
session.add(blueprint)
session.flush()
server_one = Server(
user_id=user.id,
blueprint_id=blueprint.id,
name="alpha",
port=27015,
last_error="old error",
)
server_two = Server(user_id=user.id, blueprint_id=blueprint.id, name="bravo", port=27016)
session.add_all([server_one, server_two])
session.flush()
ids = SimpleNamespace(user=user.id, server_one=server_one.id, server_two=server_two.id)
return worker_app, ids
def add_job(
user_id: int,
operation: str,
*,
server_id: int | None,
state: str = "queued",
created_at: datetime | None = None,
) -> int:
now = datetime.now(UTC)
with session_scope() as session:
job = Job(
user_id=user_id,
server_id=server_id,
operation=operation,
state=state,
created_at=created_at or now,
updated_at=created_at or now,
)
if state == "running":
job.started_at = now
session.add(job)
session.flush()
return job.id
def load_job(job_id: int) -> Job:
with session_scope() as session:
job = session.scalar(select(Job).where(Job.id == job_id))
assert job is not None
return job
def test_scheduler_predicates() -> None:
state = SchedulerState() state = SchedulerState()
state.running_servers.add(1) state.running_servers.add(1)
same_server_parallel = can_start(DummyJob(operation="start", server_id=1), state)
different_servers_parallel = can_start(DummyJob(operation="start", server_id=2), state) assert can_start(DummyJob(operation="start", server_id=1), state) is False
assert can_start(DummyJob(operation="start", server_id=2), state) is True
assert can_start(DummyJob(operation="install", server_id=None), state) is False
install_parallel = can_start(DummyJob(operation="install", server_id=None), state)
return { def test_run_worker_once_claims_oldest_runnable_job(seeded_worker, monkeypatch) -> None:
"same_server_parallel": same_server_parallel, app, ids = seeded_worker
"different_servers_parallel": different_servers_parallel, calls = []
"install_parallel": install_parallel, older = datetime.now(UTC) - timedelta(minutes=2)
} newer = datetime.now(UTC)
old_job_id = add_job(ids.user, "initialize", server_id=ids.server_one, created_at=older)
new_job_id = add_job(ids.user, "initialize", server_id=ids.server_two, created_at=newer)
monkeypatch.setattr(l4d2_facade, "initialize_server", lambda server_id, **kwargs: calls.append(server_id))
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped"))
def recover_stale_jobs(self):
with app.app_context(): with app.app_context():
return recover_stale_jobs() assert run_worker_once() is True
return WorkerFixture() assert calls == [ids.server_one]
assert load_job(old_job_id).state == "succeeded"
assert load_job(new_job_id).state == "queued"
def test_same_server_jobs_serialized(worker_fixture) -> None: def test_successful_start_job_logs_and_refreshes_server_state(seeded_worker, monkeypatch) -> None:
result = worker_fixture.run_once() app, ids = seeded_worker
assert result["same_server_parallel"] is False job_id = add_job(ids.user, "start", server_id=ids.server_one)
calls = []
def fake_initialize(server_id, *, on_stdout=None, on_stderr=None):
calls.append(("initialize", server_id))
on_stdout("initialized")
on_stderr("init warning")
def fake_start(server_id, *, on_stdout=None, on_stderr=None):
calls.append(("start", server_id))
on_stdout("started")
monkeypatch.setattr(l4d2_facade, "initialize_server", fake_initialize)
monkeypatch.setattr(l4d2_facade, "start_server", fake_start)
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="running"))
with app.app_context():
assert run_worker_once() is True
with session_scope() as session:
job = session.scalar(select(Job).where(Job.id == job_id))
server = session.scalar(select(Server).where(Server.id == ids.server_one))
lines = [(row.seq, row.stream, row.line) for row in job_logs_for(session, job_id)]
assert calls == [("initialize", ids.server_one), ("start", ids.server_one)]
assert job is not None
assert job.state == "succeeded"
assert job.exit_code == 0
assert job.started_at is not None
assert job.finished_at is not None
assert job.updated_at is not None
assert lines == [
(1, "stdout", "initialized"),
(2, "stderr", "init warning"),
(3, "stdout", "started"),
]
assert server is not None
assert server.actual_state == "running"
assert server.last_error == ""
def test_different_servers_can_run_parallel(worker_fixture) -> None: def job_logs_for(session, job_id: int):
result = worker_fixture.run_once() from l4d2web.models import JobLog
assert result["different_servers_parallel"] is True
return session.scalars(select(JobLog).where(JobLog.job_id == job_id).order_by(JobLog.seq)).all()
def test_install_global_exclusive(worker_fixture) -> None: def test_called_process_error_fails_job_and_sets_server_error(seeded_worker, monkeypatch) -> None:
result = worker_fixture.run_once() app, ids = seeded_worker
assert result["install_parallel"] is False job_id = add_job(ids.user, "stop", server_id=ids.server_one)
def fail_stop(server_id, **kwargs):
raise subprocess.CalledProcessError(returncode=7, cmd=["stop"], stderr="stop failed")
monkeypatch.setattr(l4d2_facade, "stop_server", fail_stop)
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped"))
with app.app_context():
assert run_worker_once() is True
with session_scope() as session:
job = session.scalar(select(Job).where(Job.id == job_id))
server = session.scalar(select(Server).where(Server.id == ids.server_one))
lines = [row.line for row in job_logs_for(session, job_id)]
assert job is not None
assert job.state == "failed"
assert job.exit_code == 7
assert server is not None
assert server.last_error == "stop failed"
assert "stop failed" in lines
def test_recover_stale_running_jobs(worker_fixture) -> None: def test_refresh_failure_does_not_hide_operation_failure(seeded_worker, monkeypatch) -> None:
recovered = worker_fixture.recover_stale_jobs() app, ids = seeded_worker
assert recovered >= 0 job_id = add_job(ids.user, "stop", server_id=ids.server_one)
def fail_stop(server_id, **kwargs):
raise subprocess.CalledProcessError(returncode=7, cmd=["stop"], stderr="stop failed")
monkeypatch.setattr(l4d2_facade, "stop_server", fail_stop)
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: (_ for _ in ()).throw(RuntimeError("status down")))
with app.app_context():
assert run_worker_once() is True
with session_scope() as session:
job = session.scalar(select(Job).where(Job.id == job_id))
server = session.scalar(select(Server).where(Server.id == ids.server_one))
lines = [row.line for row in job_logs_for(session, job_id)]
assert job is not None
assert job.state == "failed"
assert job.exit_code == 7
assert server is not None
assert server.last_error == "stop failed"
assert "status refresh failed: status down" in lines
def test_unexpected_exception_fails_job_with_exit_code_one(seeded_worker, monkeypatch) -> None:
app, ids = seeded_worker
job_id = add_job(ids.user, "delete", server_id=ids.server_one)
monkeypatch.setattr(l4d2_facade, "delete_server", lambda server_id, **kwargs: (_ for _ in ()).throw(RuntimeError("boom")))
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="unknown"))
with app.app_context():
assert run_worker_once() is True
job = load_job(job_id)
assert job.state == "failed"
assert job.exit_code == 1
def test_same_server_jobs_do_not_overlap(seeded_worker, monkeypatch) -> None:
app, ids = seeded_worker
add_job(ids.user, "start", server_id=ids.server_one, state="running")
queued_id = add_job(ids.user, "stop", server_id=ids.server_one)
monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: pytest.fail("must not run"))
with app.app_context():
assert run_worker_once() is False
assert load_job(queued_id).state == "queued"
def test_different_server_jobs_can_be_claimed_while_other_server_runs(seeded_worker, monkeypatch) -> None:
app, ids = seeded_worker
add_job(ids.user, "start", server_id=ids.server_one, state="running")
queued_id = add_job(ids.user, "stop", server_id=ids.server_two)
monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: None)
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped"))
with app.app_context():
assert run_worker_once() is True
assert load_job(queued_id).state == "succeeded"
def test_install_job_not_claimed_while_server_job_runs(seeded_worker) -> None:
app, ids = seeded_worker
add_job(ids.user, "start", server_id=ids.server_one, state="running")
install_id = add_job(ids.user, "install", server_id=None)
with app.app_context():
assert run_worker_once() is False
assert load_job(install_id).state == "queued"
def test_server_job_not_claimed_while_install_runs(seeded_worker) -> None:
app, ids = seeded_worker
add_job(ids.user, "install", server_id=None, state="running")
queued_id = add_job(ids.user, "initialize", server_id=ids.server_one)
with app.app_context():
assert run_worker_once() is False
assert load_job(queued_id).state == "queued"
def test_recover_stale_running_jobs(worker_app) -> None:
with session_scope() as session:
user = User(username="bob", password_digest=hash_password("secret"), admin=False)
session.add(user)
session.flush()
job = Job(user_id=user.id, server_id=None, operation="install", state="running")
session.add(job)
session.flush()
job_id = job.id
with worker_app.app_context():
assert recover_stale_jobs() == 1
recovered = load_job(job_id)
assert recovered.state == "failed"
assert recovered.finished_at is not None
def test_worker_startup_skipped_during_testing(monkeypatch, tmp_path) -> None:
from l4d2web import app as app_module
called = []
db_url = f"sqlite:///{tmp_path/'startup.db'}"
monkeypatch.setattr(app_module, "start_job_workers", lambda app: called.append(app))
app_module.create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
assert called == []
def test_worker_startup_when_enabled_outside_testing(monkeypatch, tmp_path) -> None:
from l4d2web import app as app_module
called = []
db_url = f"sqlite:///{tmp_path/'startup-enabled.db'}"
monkeypatch.setattr(app_module, "start_job_workers", lambda app: called.append(app))
app = app_module.create_app({"TESTING": False, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
assert called == [app]

View file

@ -4,7 +4,7 @@ from pathlib import Path
from l4d2web.app import create_app from l4d2web.app import create_app
from l4d2web.auth import hash_password from l4d2web.auth import hash_password
from l4d2web.db import init_db, session_scope from l4d2web.db import init_db, session_scope
from l4d2web.models import Blueprint, BlueprintOverlay, Overlay, Server, User from l4d2web.models import Blueprint, BlueprintOverlay, Job, Overlay, Server, User
@pytest.fixture @pytest.fixture
@ -161,16 +161,50 @@ def test_admin_can_use_admin_pages(tmp_path, monkeypatch) -> None:
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = admin_id sess["user_id"] = admin_id
assert client.get("/admin").status_code == 200 admin_page = client.get("/admin")
assert admin_page.status_code == 200
assert 'action="/admin/install"' in admin_page.get_data(as_text=True)
assert client.get("/admin/users").status_code == 200 assert client.get("/admin/users").status_code == 200
assert client.get("/admin/jobs").status_code == 200 assert client.get("/admin/jobs").status_code == 200
assert 'href="/admin"' in client.get("/dashboard").get_data(as_text=True) assert 'href="/admin"' in client.get("/dashboard").get_data(as_text=True)
def test_admin_can_enqueue_runtime_install_job(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'admin-install.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
with session_scope() as session:
admin = User(username="admin", password_digest=hash_password("secret"), admin=True)
session.add(admin)
session.flush()
admin_id = admin.id
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["csrf_token"] = "test-token"
response = client.post("/admin/install", headers={"X-CSRF-Token": "test-token"})
assert response.status_code == 302
assert response.headers["Location"].endswith("/admin/jobs")
with session_scope() as session:
job = session.query(Job).one()
assert job.user_id == admin_id
assert job.server_id is None
assert job.operation == "install"
assert job.state == "queued"
def test_non_admin_cannot_open_admin_pages(auth_client_with_server) -> None: def test_non_admin_cannot_open_admin_pages(auth_client_with_server) -> None:
assert auth_client_with_server.get("/admin").status_code == 403 assert auth_client_with_server.get("/admin").status_code == 403
assert auth_client_with_server.get("/admin/users").status_code == 403 assert auth_client_with_server.get("/admin/users").status_code == 403
assert auth_client_with_server.get("/admin/jobs").status_code == 403 assert auth_client_with_server.get("/admin/jobs").status_code == 403
with auth_client_with_server.session_transaction() as sess:
sess["csrf_token"] = "test-token"
assert auth_client_with_server.post("/admin/install", headers={"X-CSRF-Token": "test-token"}).status_code == 403
def test_anonymous_protected_page_redirects_to_login_with_next(tmp_path, monkeypatch) -> None: def test_anonymous_protected_page_redirects_to_login_with_next(tmp_path, monkeypatch) -> None: