from pathlib import Path from sqlalchemy import text import pytest from l4d2web.app import create_app from l4d2web.auth import hash_password from l4d2web.db import get_engine, init_db, session_scope from l4d2web.models import Job, JobLog, User from l4d2web.services.job_worker import append_job_log @pytest.fixture def seeded_job_logs(tmp_path, monkeypatch): db_url = f"sqlite:///{tmp_path/'joblogs.db'}" monkeypatch.setenv("DATABASE_URL", db_url) app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) init_db() with session_scope() as session: user = User(username="alice", password_digest=hash_password("secret"), admin=False) session.add(user) session.flush() job = Job(user_id=user.id, server_id=None, operation="install", state="succeeded") session.add(job) session.flush() for idx in range(1, 8): session.add(JobLog(job_id=job.id, seq=idx, stream="stdout", line=f"line-{idx}")) job_id = job.id user_id = user.id return app, job_id, user_id def test_job_logs_seq_monotonic(seeded_job_logs) -> None: app, job_id, _ = seeded_job_logs with app.app_context(): with get_engine().connect() as conn: rows = conn.execute( text("select seq from job_logs where job_id=:id order by seq"), {"id": job_id}, ).all() values = [row[0] for row in rows] assert values == sorted(values) def test_append_job_log_increments_seq(seeded_job_logs) -> None: app, job_id, _ = seeded_job_logs with app.app_context(): with session_scope() as session: append_job_log(session, job_id=job_id, stream="stdout", line="new line") with session_scope() as session: last = session.query(JobLog).filter(JobLog.job_id == job_id).order_by(JobLog.seq.desc()).first() assert last is not None assert last.seq == 8 def test_sse_resume_from_last_seq(seeded_job_logs) -> None: app, job_id, user_id = seeded_job_logs client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id response = client.get(f"/jobs/{job_id}/stream?last_seq=5") assert response.status_code == 200 def test_sse_replays_custom_job_log_events(seeded_job_logs) -> None: app, job_id, user_id = seeded_job_logs client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id response = client.get(f"/jobs/{job_id}/stream?last_seq=5") text = response.get_data(as_text=True) assert "id: 6\n" in text assert "event: stdout\n" in text assert "data: line-6\n\n" in text assert "data: line-5\n\n" not in text def test_sse_resumes_from_last_event_id_header(seeded_job_logs) -> None: app, job_id, user_id = seeded_job_logs client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id response = client.get(f"/jobs/{job_id}/stream", headers={"Last-Event-ID": "6"}) text = response.get_data(as_text=True) assert "data: line-7\n\n" in text assert "data: line-6\n\n" not in text def test_sse_js_handles_job_log_custom_events() -> None: js = Path("l4d2web/static/js/sse.js").read_text() assert 'addEventListener("stdout"' in js assert 'addEventListener("stderr"' in js def test_system_job_logs_persist(tmp_path, monkeypatch): from l4d2web.models import Job, JobLog from l4d2web.services.job_worker import append_job_log monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'system-job-logs.db'}") init_db() with session_scope() as db: job = Job( user_id=None, server_id=None, operation="refresh_global_overlays", state="queued", ) db.add(job) db.flush() seq = append_job_log(db, job.id, "stdout", "queued by system timer") db.flush() row = db.query(JobLog).filter_by(job_id=job.id).one() assert seq == 1 assert row.line == "queued by system timer"