From 271b2d347c8e66d465698fd53dae477c3c408312 Mon Sep 17 00:00:00 2001 From: mwiegand Date: Thu, 23 Apr 2026 01:15:12 +0200 Subject: [PATCH] feat(l4d2-web): persist command logs and stream them with sse --- components/l4d2-web-app/src/l4d2web/app.py | 2 + .../src/l4d2web/routes/job_routes.py | 39 +++++++++++ .../src/l4d2web/services/job_worker.py | 19 ++++- .../l4d2-web-app/tests/test_job_logs.py | 69 +++++++++++++++++++ 4 files changed, 127 insertions(+), 2 deletions(-) create mode 100644 components/l4d2-web-app/src/l4d2web/routes/job_routes.py create mode 100644 components/l4d2-web-app/tests/test_job_logs.py diff --git a/components/l4d2-web-app/src/l4d2web/app.py b/components/l4d2-web-app/src/l4d2web/app.py index 4919230..8d71352 100644 --- a/components/l4d2-web-app/src/l4d2web/app.py +++ b/components/l4d2-web-app/src/l4d2web/app.py @@ -8,6 +8,7 @@ from l4d2web.config import DEFAULT_CONFIG from l4d2web.db import init_db from l4d2web.routes.blueprint_routes import bp as blueprint_bp from l4d2web.routes.auth_routes import bp as auth_bp +from l4d2web.routes.job_routes import bp as job_bp from l4d2web.routes.overlay_routes import bp as overlay_bp from l4d2web.routes.server_routes import bp as server_bp from l4d2web.services.job_worker import recover_stale_jobs @@ -27,6 +28,7 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask: app.register_blueprint(overlay_bp) app.register_blueprint(blueprint_bp) app.register_blueprint(server_bp) + app.register_blueprint(job_bp) register_cli(app) recover_stale_jobs() diff --git a/components/l4d2-web-app/src/l4d2web/routes/job_routes.py b/components/l4d2-web-app/src/l4d2web/routes/job_routes.py new file mode 100644 index 0000000..4fdcd22 --- /dev/null +++ b/components/l4d2-web-app/src/l4d2web/routes/job_routes.py @@ -0,0 +1,39 @@ +from flask import Blueprint, Response, current_app, request +from sqlalchemy import select + +from l4d2web.auth import current_user, require_login +from l4d2web.db import session_scope +from l4d2web.models import Job, JobLog + + +bp = Blueprint("job", __name__) + + +@bp.get("/jobs//stream") +@require_login +def stream_job(job_id: int) -> Response: + user = current_user() + assert user is not None + + last_seq = int(request.args.get("last_seq", "0")) + limit = int(current_app.config["JOB_LOG_REPLAY_LIMIT"]) + + with session_scope() as db: + job = db.scalar(select(Job).where(Job.id == job_id, Job.user_id == user.id)) + if job is None: + return Response(status=404) + + def generate(): + with session_scope() as db: + rows = db.scalars( + select(JobLog) + .where(JobLog.job_id == job_id, JobLog.seq > last_seq) + .order_by(JobLog.seq) + .limit(limit) + ).all() + for row in rows: + yield f"id: {row.seq}\n" + yield f"event: {row.stream}\n" + yield f"data: {row.line}\n\n" + + return Response(generate(), mimetype="text/event-stream") diff --git a/components/l4d2-web-app/src/l4d2web/services/job_worker.py b/components/l4d2-web-app/src/l4d2web/services/job_worker.py index be6a7ff..cce08d7 100644 --- a/components/l4d2-web-app/src/l4d2web/services/job_worker.py +++ b/components/l4d2-web-app/src/l4d2web/services/job_worker.py @@ -1,10 +1,11 @@ from dataclasses import dataclass, field from datetime import UTC, datetime -from sqlalchemy import select +from sqlalchemy import func, select +from sqlalchemy.orm import Session from l4d2web.db import session_scope -from l4d2web.models import Job +from l4d2web.models import Job, JobLog @dataclass @@ -32,3 +33,17 @@ def recover_stale_jobs() -> int: job.finished_at = now job.updated_at = now return len(jobs) + + +def append_job_log( + session: Session, + job_id: int, + stream: str, + line: str, + max_chars: int = 4096, +) -> int: + last_seq = session.scalar(select(func.max(JobLog.seq)).where(JobLog.job_id == job_id)) or 0 + next_seq = int(last_seq) + 1 + session.add(JobLog(job_id=job_id, seq=next_seq, stream=stream, line=line[:max_chars])) + session.flush() + return next_seq diff --git a/components/l4d2-web-app/tests/test_job_logs.py b/components/l4d2-web-app/tests/test_job_logs.py new file mode 100644 index 0000000..cb88b55 --- /dev/null +++ b/components/l4d2-web-app/tests/test_job_logs.py @@ -0,0 +1,69 @@ +from sqlalchemy import text + +import pytest + +from l4d2web.app import create_app +from l4d2web.auth import hash_password +from l4d2web.db import get_engine, init_db, session_scope +from l4d2web.models import Job, JobLog, User +from l4d2web.services.job_worker import append_job_log + + +@pytest.fixture +def seeded_job_logs(tmp_path, monkeypatch): + db_url = f"sqlite:///{tmp_path/'joblogs.db'}" + monkeypatch.setenv("DATABASE_URL", db_url) + app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) + init_db() + + with session_scope() as session: + user = User(username="alice", password_digest=hash_password("secret"), admin=False) + session.add(user) + session.flush() + + job = Job(user_id=user.id, server_id=None, operation="install", state="queued") + session.add(job) + session.flush() + + for idx in range(1, 8): + session.add(JobLog(job_id=job.id, seq=idx, stream="stdout", line=f"line-{idx}")) + + job_id = job.id + user_id = user.id + + return app, job_id, user_id + + +def test_job_logs_seq_monotonic(seeded_job_logs) -> None: + app, job_id, _ = seeded_job_logs + with app.app_context(): + with get_engine().connect() as conn: + rows = conn.execute( + text("select seq from job_logs where job_id=:id order by seq"), + {"id": job_id}, + ).all() + + values = [row[0] for row in rows] + assert values == sorted(values) + + +def test_append_job_log_increments_seq(seeded_job_logs) -> None: + app, job_id, _ = seeded_job_logs + + with app.app_context(): + with session_scope() as session: + append_job_log(session, job_id=job_id, stream="stdout", line="new line") + with session_scope() as session: + last = session.query(JobLog).filter(JobLog.job_id == job_id).order_by(JobLog.seq.desc()).first() + assert last is not None + assert last.seq == 8 + + +def test_sse_resume_from_last_seq(seeded_job_logs) -> None: + app, job_id, user_id = seeded_job_logs + client = app.test_client() + with client.session_transaction() as sess: + sess["user_id"] = user_id + + response = client.get(f"/jobs/{job_id}/stream?last_seq=5") + assert response.status_code == 200