feat(l4d2-web): persist command logs and stream them with sse
This commit is contained in:
parent
fd320879c8
commit
271b2d347c
4 changed files with 127 additions and 2 deletions
|
|
@ -8,6 +8,7 @@ from l4d2web.config import DEFAULT_CONFIG
|
|||
from l4d2web.db import init_db
|
||||
from l4d2web.routes.blueprint_routes import bp as blueprint_bp
|
||||
from l4d2web.routes.auth_routes import bp as auth_bp
|
||||
from l4d2web.routes.job_routes import bp as job_bp
|
||||
from l4d2web.routes.overlay_routes import bp as overlay_bp
|
||||
from l4d2web.routes.server_routes import bp as server_bp
|
||||
from l4d2web.services.job_worker import recover_stale_jobs
|
||||
|
|
@ -27,6 +28,7 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
|
|||
app.register_blueprint(overlay_bp)
|
||||
app.register_blueprint(blueprint_bp)
|
||||
app.register_blueprint(server_bp)
|
||||
app.register_blueprint(job_bp)
|
||||
register_cli(app)
|
||||
recover_stale_jobs()
|
||||
|
||||
|
|
|
|||
39
components/l4d2-web-app/src/l4d2web/routes/job_routes.py
Normal file
39
components/l4d2-web-app/src/l4d2web/routes/job_routes.py
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
from flask import Blueprint, Response, current_app, request
|
||||
from sqlalchemy import select
|
||||
|
||||
from l4d2web.auth import current_user, require_login
|
||||
from l4d2web.db import session_scope
|
||||
from l4d2web.models import Job, JobLog
|
||||
|
||||
|
||||
bp = Blueprint("job", __name__)
|
||||
|
||||
|
||||
@bp.get("/jobs/<int:job_id>/stream")
|
||||
@require_login
|
||||
def stream_job(job_id: int) -> Response:
|
||||
user = current_user()
|
||||
assert user is not None
|
||||
|
||||
last_seq = int(request.args.get("last_seq", "0"))
|
||||
limit = int(current_app.config["JOB_LOG_REPLAY_LIMIT"])
|
||||
|
||||
with session_scope() as db:
|
||||
job = db.scalar(select(Job).where(Job.id == job_id, Job.user_id == user.id))
|
||||
if job is None:
|
||||
return Response(status=404)
|
||||
|
||||
def generate():
|
||||
with session_scope() as db:
|
||||
rows = db.scalars(
|
||||
select(JobLog)
|
||||
.where(JobLog.job_id == job_id, JobLog.seq > last_seq)
|
||||
.order_by(JobLog.seq)
|
||||
.limit(limit)
|
||||
).all()
|
||||
for row in rows:
|
||||
yield f"id: {row.seq}\n"
|
||||
yield f"event: {row.stream}\n"
|
||||
yield f"data: {row.line}\n\n"
|
||||
|
||||
return Response(generate(), mimetype="text/event-stream")
|
||||
|
|
@ -1,10 +1,11 @@
|
|||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from l4d2web.db import session_scope
|
||||
from l4d2web.models import Job
|
||||
from l4d2web.models import Job, JobLog
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
@ -32,3 +33,17 @@ def recover_stale_jobs() -> int:
|
|||
job.finished_at = now
|
||||
job.updated_at = now
|
||||
return len(jobs)
|
||||
|
||||
|
||||
def append_job_log(
|
||||
session: Session,
|
||||
job_id: int,
|
||||
stream: str,
|
||||
line: str,
|
||||
max_chars: int = 4096,
|
||||
) -> int:
|
||||
last_seq = session.scalar(select(func.max(JobLog.seq)).where(JobLog.job_id == job_id)) or 0
|
||||
next_seq = int(last_seq) + 1
|
||||
session.add(JobLog(job_id=job_id, seq=next_seq, stream=stream, line=line[:max_chars]))
|
||||
session.flush()
|
||||
return next_seq
|
||||
|
|
|
|||
69
components/l4d2-web-app/tests/test_job_logs.py
Normal file
69
components/l4d2-web-app/tests/test_job_logs.py
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
from sqlalchemy import text
|
||||
|
||||
import pytest
|
||||
|
||||
from l4d2web.app import create_app
|
||||
from l4d2web.auth import hash_password
|
||||
from l4d2web.db import get_engine, init_db, session_scope
|
||||
from l4d2web.models import Job, JobLog, User
|
||||
from l4d2web.services.job_worker import append_job_log
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def seeded_job_logs(tmp_path, monkeypatch):
|
||||
db_url = f"sqlite:///{tmp_path/'joblogs.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||
init_db()
|
||||
|
||||
with session_scope() as session:
|
||||
user = User(username="alice", password_digest=hash_password("secret"), admin=False)
|
||||
session.add(user)
|
||||
session.flush()
|
||||
|
||||
job = Job(user_id=user.id, server_id=None, operation="install", state="queued")
|
||||
session.add(job)
|
||||
session.flush()
|
||||
|
||||
for idx in range(1, 8):
|
||||
session.add(JobLog(job_id=job.id, seq=idx, stream="stdout", line=f"line-{idx}"))
|
||||
|
||||
job_id = job.id
|
||||
user_id = user.id
|
||||
|
||||
return app, job_id, user_id
|
||||
|
||||
|
||||
def test_job_logs_seq_monotonic(seeded_job_logs) -> None:
|
||||
app, job_id, _ = seeded_job_logs
|
||||
with app.app_context():
|
||||
with get_engine().connect() as conn:
|
||||
rows = conn.execute(
|
||||
text("select seq from job_logs where job_id=:id order by seq"),
|
||||
{"id": job_id},
|
||||
).all()
|
||||
|
||||
values = [row[0] for row in rows]
|
||||
assert values == sorted(values)
|
||||
|
||||
|
||||
def test_append_job_log_increments_seq(seeded_job_logs) -> None:
|
||||
app, job_id, _ = seeded_job_logs
|
||||
|
||||
with app.app_context():
|
||||
with session_scope() as session:
|
||||
append_job_log(session, job_id=job_id, stream="stdout", line="new line")
|
||||
with session_scope() as session:
|
||||
last = session.query(JobLog).filter(JobLog.job_id == job_id).order_by(JobLog.seq.desc()).first()
|
||||
assert last is not None
|
||||
assert last.seq == 8
|
||||
|
||||
|
||||
def test_sse_resume_from_last_seq(seeded_job_logs) -> None:
|
||||
app, job_id, user_id = seeded_job_logs
|
||||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
|
||||
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
|
||||
assert response.status_code == 200
|
||||
Loading…
Reference in a new issue