left4me/l4d2web/routes/job_routes.py
mwiegand 92d6ebbe82
feat(l4d2-web): managed global map overlays with daily refresh
Adds two managed system overlays (l4d2center-maps, cedapug-maps) that
fetch curated map archives from upstream sources and reconcile addons
symlinks for non-Steam maps. A daily systemd timer enqueues a coalesced
refresh_global_overlays worker job; downloads, extraction, and rebuilds
run in the existing job worker and surface in the job log UI.

Schema: GlobalOverlaySource / GlobalOverlayItem / GlobalOverlayItemFile
plus nullable Job.user_id so system jobs render as "system" in the UI.
The new builder reconciles symlinks against the per-source vpk cache
and leaves foreign symlinks untouched. Initialize-time guard refuses
to mount a partial overlay if any expected vpk is missing from cache.

Refresh service uses shutil.move to handle EXDEV when /tmp and the
cache live on different filesystems.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 08:05:14 +02:00

129 lines
4.1 KiB
Python

from datetime import UTC, datetime
import time
from flask import Blueprint, Response, current_app, redirect, render_template, request
from sqlalchemy import select
from l4d2web.auth import current_user, is_safe_next, require_login
from l4d2web.db import session_scope
from l4d2web.models import Job, JobLog, Server, User
from l4d2web.services.job_worker import append_job_log
bp = Blueprint("job", __name__)
TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"}
def format_sse_event(seq: int, event: str, data: str) -> str:
lines = [f"id: {seq}", f"event: {event}"]
for line in data.splitlines() or [""]:
lines.append(f"data: {line}")
return "\n".join(lines) + "\n\n"
def can_access_job(job: Job, user: User) -> bool:
if user.admin:
return True
if job.user_id is None:
return False
return job.user_id == user.id
@bp.get("/jobs/<int:job_id>")
@require_login
def job_detail(job_id: int) -> str | Response:
user = current_user()
assert user is not None
with session_scope() as db:
row = db.execute(
select(Job, User, Server)
.outerjoin(User, User.id == Job.user_id)
.outerjoin(Server, Server.id == Job.server_id)
.where(Job.id == job_id)
).first()
if row is None:
return Response(status=404)
job, owner, server = row
if not can_access_job(job, user):
return Response(status=403)
return render_template("job_detail.html", job=job, owner=owner, server=server)
@bp.post("/jobs/<int:job_id>/cancel")
@require_login
def cancel_job(job_id: int) -> Response:
user = current_user()
assert user is not None
next_url = request.form.get("next")
if not is_safe_next(next_url):
next_url = f"/jobs/{job_id}"
with session_scope() as db:
job = db.scalar(select(Job).where(Job.id == job_id))
if job is None:
return Response(status=404)
if not can_access_job(job, user):
return Response(status=403)
now = datetime.now(UTC)
if job.state == "queued":
job.state = "cancelled"
job.exit_code = 1
job.finished_at = now
job.updated_at = now
append_job_log(db, job.id, "stderr", "job cancelled before execution")
elif job.state == "running":
job.state = "cancelling"
job.updated_at = now
append_job_log(db, job.id, "stderr", "job cancellation requested; attempting to terminate running process")
elif job.state == "cancelling":
return redirect(next_url)
else:
return Response("job cannot be cancelled", status=409)
return redirect(next_url)
@bp.get("/jobs/<int:job_id>/stream")
@require_login
def stream_job(job_id: int) -> Response:
user = current_user()
assert user is not None
last_seq = int(request.args.get("last_seq") or request.headers.get("Last-Event-ID") or "0")
limit = int(current_app.config["JOB_LOG_REPLAY_LIMIT"])
poll_seconds = float(current_app.config.get("JOB_WORKER_POLL_SECONDS", 1))
with session_scope() as db:
job = db.scalar(select(Job).where(Job.id == job_id))
if job is None:
return Response(status=404)
if not can_access_job(job, user):
return Response(status=403)
def generate():
next_seq = last_seq
while True:
with session_scope() as db:
job = db.scalar(select(Job).where(Job.id == job_id))
if job is None:
return
rows = db.scalars(
select(JobLog)
.where(JobLog.job_id == job_id, JobLog.seq > next_seq)
.order_by(JobLog.seq)
.limit(limit)
).all()
terminal = job.state in TERMINAL_JOB_STATES
for row in rows:
next_seq = row.seq
yield format_sse_event(row.seq, row.stream, row.line)
if terminal and len(rows) < limit:
return
time.sleep(poll_seconds)
return Response(generate(), mimetype="text/event-stream")