Migrate from pip-install-e + setuptools to a uv workspace with a
committed uv.lock for deterministic deps. Switch both members to
hatchling, and move package sources into nested standard layout
(l4d2host/l4d2host/, l4d2web/l4d2web/) so builds work from a
read-only source tree — setuptools wrote egg-info to source under
the old layout, which broke uv sync on the root-owned /opt/left4me/src.
Local dev install: `pip install -e ./l4d2host -e ./l4d2web` -> `uv sync`.
.envrc switches from `layout python python3.13` to `use uv`. Python
pinned to 3.13 via .python-version.
l4d2web now declares its cross-dep on l4d2host explicitly via
[tool.uv.sources] (workspace = true). l4d2web/alembic.ini and
l4d2web/alembic/ stay at the project root (standard alembic layout).
Test fixes:
- tests/__init__.py added to both test dirs so pytest doesn't shadow
l4d2host as a namespace package via outer-dir walk.
- 3 CWD-relative paths in tests (l4d2web/static/css/{tokens,layout}.css
and js/sse.js) anchored to Path(__file__) so they survive layout
changes.
- Two test_install.py tests now monkeypatch HOME to tmp_path so they
stop silently mutating ~/.steam/sdk32 on every run.
628 tests pass under sandboxed `uv run pytest`.
Per docs/superpowers/plans/2026-05-15-uv-workspace-execution.md;
prereq for the ckn-bw bundle's uv-sync action (queued).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
135 lines
4.3 KiB
Python
135 lines
4.3 KiB
Python
from pathlib import Path
|
|
from datetime import UTC, datetime
|
|
|
|
from sqlalchemy import text
|
|
|
|
import pytest
|
|
|
|
from l4d2web.app import create_app
|
|
from l4d2web.auth import hash_password
|
|
from l4d2web.db import get_engine, init_db, session_scope
|
|
from l4d2web.models import Job, JobLog, User
|
|
from l4d2web.services.job_worker import append_job_log
|
|
|
|
|
|
@pytest.fixture
|
|
def seeded_job_logs(tmp_path, monkeypatch):
|
|
db_url = f"sqlite:///{tmp_path/'joblogs.db'}"
|
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
init_db()
|
|
|
|
with session_scope() as session:
|
|
user = User(username="alice", password_digest=hash_password("secret"), admin=False)
|
|
session.add(user)
|
|
session.flush()
|
|
|
|
job = Job(user_id=user.id, server_id=None, operation="install", state="succeeded")
|
|
session.add(job)
|
|
session.flush()
|
|
|
|
for idx in range(1, 8):
|
|
session.add(JobLog(job_id=job.id, seq=idx, stream="stdout", line=f"line-{idx}"))
|
|
|
|
job_id = job.id
|
|
user_id = user.id
|
|
|
|
return app, job_id, user_id
|
|
|
|
|
|
def test_job_logs_seq_monotonic(seeded_job_logs) -> None:
|
|
app, job_id, _ = seeded_job_logs
|
|
with app.app_context():
|
|
with get_engine().connect() as conn:
|
|
rows = conn.execute(
|
|
text("select seq from job_logs where job_id=:id order by seq"),
|
|
{"id": job_id},
|
|
).all()
|
|
|
|
values = [row[0] for row in rows]
|
|
assert values == sorted(values)
|
|
|
|
|
|
def test_append_job_log_increments_seq(seeded_job_logs) -> None:
|
|
app, job_id, _ = seeded_job_logs
|
|
|
|
with app.app_context():
|
|
with session_scope() as session:
|
|
append_job_log(session, job_id=job_id, stream="stdout", line="new line")
|
|
with session_scope() as session:
|
|
last = session.query(JobLog).filter(JobLog.job_id == job_id).order_by(JobLog.seq.desc()).first()
|
|
assert last is not None
|
|
assert last.seq == 8
|
|
|
|
|
|
def test_sse_resume_from_last_seq(seeded_job_logs) -> None:
|
|
app, job_id, user_id = seeded_job_logs
|
|
client = app.test_client()
|
|
with client.session_transaction() as sess:
|
|
sess["user_id"] = user_id
|
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
|
|
|
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_sse_replays_custom_job_log_events(seeded_job_logs) -> None:
|
|
app, job_id, user_id = seeded_job_logs
|
|
client = app.test_client()
|
|
with client.session_transaction() as sess:
|
|
sess["user_id"] = user_id
|
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
|
|
|
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
|
|
text = response.get_data(as_text=True)
|
|
|
|
assert "id: 6\n" in text
|
|
assert "event: stdout\n" in text
|
|
assert "data: line-6\n\n" in text
|
|
assert "data: line-5\n\n" not in text
|
|
|
|
|
|
def test_sse_resumes_from_last_event_id_header(seeded_job_logs) -> None:
|
|
app, job_id, user_id = seeded_job_logs
|
|
client = app.test_client()
|
|
with client.session_transaction() as sess:
|
|
sess["user_id"] = user_id
|
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
|
|
|
response = client.get(f"/jobs/{job_id}/stream", headers={"Last-Event-ID": "6"})
|
|
text = response.get_data(as_text=True)
|
|
|
|
assert "data: line-7\n\n" in text
|
|
assert "data: line-6\n\n" not in text
|
|
|
|
|
|
def test_sse_js_handles_job_log_custom_events() -> None:
|
|
js = (Path(__file__).resolve().parents[1] / "l4d2web" / "static" / "js" / "sse.js").read_text()
|
|
|
|
assert 'addEventListener("stdout"' in js
|
|
assert 'addEventListener("stderr"' in js
|
|
|
|
|
|
def test_system_job_logs_persist(tmp_path, monkeypatch):
|
|
from l4d2web.models import Job, JobLog
|
|
from l4d2web.services.job_worker import append_job_log
|
|
|
|
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'system-job-logs.db'}")
|
|
init_db()
|
|
|
|
with session_scope() as db:
|
|
job = Job(
|
|
user_id=None,
|
|
server_id=None,
|
|
operation="refresh_workshop_items",
|
|
state="queued",
|
|
)
|
|
db.add(job)
|
|
db.flush()
|
|
|
|
seq = append_job_log(db, job.id, "stdout", "queued by system timer")
|
|
db.flush()
|
|
|
|
row = db.query(JobLog).filter_by(job_id=job.id).one()
|
|
assert seq == 1
|
|
assert row.line == "queued by system timer"
|