from datetime import UTC, datetime import time from flask import Blueprint, Response, current_app, redirect, render_template, request from sqlalchemy import select from l4d2web.auth import current_user, is_safe_next, require_login from l4d2web.db import session_scope from l4d2web.models import Job, JobLog, Server, User from l4d2web.services.job_worker import append_job_log bp = Blueprint("job", __name__) TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"} def format_sse_event(seq: int, event: str, data: str) -> str: lines = [f"id: {seq}", f"event: {event}"] for line in data.splitlines() or [""]: lines.append(f"data: {line}") return "\n".join(lines) + "\n\n" def can_access_job(job: Job, user: User) -> bool: if user.admin: return True if job.user_id is None: return False return job.user_id == user.id @bp.get("/jobs/") @require_login def job_detail(job_id: int) -> str | Response: user = current_user() assert user is not None with session_scope() as db: row = db.execute( select(Job, User, Server) .outerjoin(User, User.id == Job.user_id) .outerjoin(Server, Server.id == Job.server_id) .where(Job.id == job_id) ).first() if row is None: return Response(status=404) job, owner, server = row if not can_access_job(job, user): return Response(status=403) return render_template("job_detail.html", job=job, owner=owner, server=server) @bp.post("/jobs//cancel") @require_login def cancel_job(job_id: int) -> Response: user = current_user() assert user is not None next_url = request.form.get("next") if not is_safe_next(next_url): next_url = f"/jobs/{job_id}" with session_scope() as db: job = db.scalar(select(Job).where(Job.id == job_id)) if job is None: return Response(status=404) if not can_access_job(job, user): return Response(status=403) now = datetime.now(UTC) if job.state == "queued": job.state = "cancelled" job.exit_code = 1 job.finished_at = now job.updated_at = now append_job_log(db, job.id, "stderr", "job cancelled before execution") elif job.state == "running": job.state = "cancelling" job.updated_at = now append_job_log(db, job.id, "stderr", "job cancellation requested; attempting to terminate running process") elif job.state == "cancelling": return redirect(next_url) else: return Response("job cannot be cancelled", status=409) return redirect(next_url) @bp.get("/jobs//stream") @require_login def stream_job(job_id: int) -> Response: user = current_user() assert user is not None last_seq = int(request.args.get("last_seq") or request.headers.get("Last-Event-ID") or "0") limit = int(current_app.config["JOB_LOG_REPLAY_LIMIT"]) poll_seconds = float(current_app.config.get("JOB_WORKER_POLL_SECONDS", 1)) with session_scope() as db: job = db.scalar(select(Job).where(Job.id == job_id)) if job is None: return Response(status=404) if not can_access_job(job, user): return Response(status=403) def generate(): next_seq = last_seq while True: with session_scope() as db: job = db.scalar(select(Job).where(Job.id == job_id)) if job is None: return rows = db.scalars( select(JobLog) .where(JobLog.job_id == job_id, JobLog.seq > next_seq) .order_by(JobLog.seq) .limit(limit) ).all() terminal = job.state in TERMINAL_JOB_STATES for row in rows: next_seq = row.seq yield format_sse_event(row.seq, row.stream, row.line) if terminal and len(rows) < limit: return time.sleep(poll_seconds) return Response(generate(), mimetype="text/event-stream")