left4me/l4d2web/tests/test_job_logs.py
mwiegand 49992b3a26
refactor(repo): uv workspace + hatchling + layout restructure
Migrate from pip-install-e + setuptools to a uv workspace with a
committed uv.lock for deterministic deps. Switch both members to
hatchling, and move package sources into nested standard layout
(l4d2host/l4d2host/, l4d2web/l4d2web/) so builds work from a
read-only source tree — setuptools wrote egg-info to source under
the old layout, which broke uv sync on the root-owned /opt/left4me/src.

Local dev install: `pip install -e ./l4d2host -e ./l4d2web` -> `uv sync`.
.envrc switches from `layout python python3.13` to `use uv`. Python
pinned to 3.13 via .python-version.

l4d2web now declares its cross-dep on l4d2host explicitly via
[tool.uv.sources] (workspace = true). l4d2web/alembic.ini and
l4d2web/alembic/ stay at the project root (standard alembic layout).

Test fixes:
- tests/__init__.py added to both test dirs so pytest doesn't shadow
  l4d2host as a namespace package via outer-dir walk.
- 3 CWD-relative paths in tests (l4d2web/static/css/{tokens,layout}.css
  and js/sse.js) anchored to Path(__file__) so they survive layout
  changes.
- Two test_install.py tests now monkeypatch HOME to tmp_path so they
  stop silently mutating ~/.steam/sdk32 on every run.

628 tests pass under sandboxed `uv run pytest`.

Per docs/superpowers/plans/2026-05-15-uv-workspace-execution.md;
prereq for the ckn-bw bundle's uv-sync action (queued).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 22:04:29 +02:00

135 lines
4.3 KiB
Python

from pathlib import Path
from datetime import UTC, datetime
from sqlalchemy import text
import pytest
from l4d2web.app import create_app
from l4d2web.auth import hash_password
from l4d2web.db import get_engine, init_db, session_scope
from l4d2web.models import Job, JobLog, User
from l4d2web.services.job_worker import append_job_log
@pytest.fixture
def seeded_job_logs(tmp_path, monkeypatch):
db_url = f"sqlite:///{tmp_path/'joblogs.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
with session_scope() as session:
user = User(username="alice", password_digest=hash_password("secret"), admin=False)
session.add(user)
session.flush()
job = Job(user_id=user.id, server_id=None, operation="install", state="succeeded")
session.add(job)
session.flush()
for idx in range(1, 8):
session.add(JobLog(job_id=job.id, seq=idx, stream="stdout", line=f"line-{idx}"))
job_id = job.id
user_id = user.id
return app, job_id, user_id
def test_job_logs_seq_monotonic(seeded_job_logs) -> None:
app, job_id, _ = seeded_job_logs
with app.app_context():
with get_engine().connect() as conn:
rows = conn.execute(
text("select seq from job_logs where job_id=:id order by seq"),
{"id": job_id},
).all()
values = [row[0] for row in rows]
assert values == sorted(values)
def test_append_job_log_increments_seq(seeded_job_logs) -> None:
app, job_id, _ = seeded_job_logs
with app.app_context():
with session_scope() as session:
append_job_log(session, job_id=job_id, stream="stdout", line="new line")
with session_scope() as session:
last = session.query(JobLog).filter(JobLog.job_id == job_id).order_by(JobLog.seq.desc()).first()
assert last is not None
assert last.seq == 8
def test_sse_resume_from_last_seq(seeded_job_logs) -> None:
app, job_id, user_id = seeded_job_logs
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
assert response.status_code == 200
def test_sse_replays_custom_job_log_events(seeded_job_logs) -> None:
app, job_id, user_id = seeded_job_logs
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
text = response.get_data(as_text=True)
assert "id: 6\n" in text
assert "event: stdout\n" in text
assert "data: line-6\n\n" in text
assert "data: line-5\n\n" not in text
def test_sse_resumes_from_last_event_id_header(seeded_job_logs) -> None:
app, job_id, user_id = seeded_job_logs
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}/stream", headers={"Last-Event-ID": "6"})
text = response.get_data(as_text=True)
assert "data: line-7\n\n" in text
assert "data: line-6\n\n" not in text
def test_sse_js_handles_job_log_custom_events() -> None:
js = (Path(__file__).resolve().parents[1] / "l4d2web" / "static" / "js" / "sse.js").read_text()
assert 'addEventListener("stdout"' in js
assert 'addEventListener("stderr"' in js
def test_system_job_logs_persist(tmp_path, monkeypatch):
from l4d2web.models import Job, JobLog
from l4d2web.services.job_worker import append_job_log
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'system-job-logs.db'}")
init_db()
with session_scope() as db:
job = Job(
user_id=None,
server_id=None,
operation="refresh_workshop_items",
state="queued",
)
db.add(job)
db.flush()
seq = append_job_log(db, job.id, "stdout", "queued by system timer")
db.flush()
row = db.query(JobLog).filter_by(job_id=job.id).one()
assert seq == 1
assert row.line == "queued by system timer"