cli: add workshop-refresh subcommand for scheduled global refresh

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
mwiegand 2026-05-11 23:15:05 +02:00
parent 653e3212b9
commit 0ab54b4a7d
No known key found for this signature in database
2 changed files with 79 additions and 1 deletions

View file

@ -7,7 +7,7 @@ from sqlalchemy import select
from l4d2web.auth import hash_password, validate_new_password
from l4d2web.db import session_scope
from l4d2web.models import Overlay, User
from l4d2web.models import Job, Overlay, User
from l4d2web.services.overlay_creation import (
create_overlay_directory,
generate_overlay_path,
@ -90,7 +90,38 @@ def seed_script_overlays(directory: Path) -> None:
click.echo(f"created {name} (id={overlay.id})")
@click.command("workshop-refresh")
def workshop_refresh() -> None:
"""Enqueue a global workshop refresh job. Idempotent: if a refresh is
already queued or running, prints its id and exits 0."""
with session_scope() as db:
existing = db.scalar(
select(Job)
.where(
Job.operation == "refresh_workshop_items",
Job.state.in_(("queued", "running", "cancelling")),
)
.order_by(Job.id.desc())
.limit(1)
)
if existing is not None:
click.echo(
f"refresh_workshop_items job {existing.id} already {existing.state}"
)
return
job = Job(
user_id=None,
server_id=None,
operation="refresh_workshop_items",
state="queued",
)
db.add(job)
db.flush()
click.echo(f"enqueued refresh_workshop_items job {job.id}")
def register_cli(app) -> None:
app.cli.add_command(promote_admin)
app.cli.add_command(create_user)
app.cli.add_command(seed_script_overlays)
app.cli.add_command(workshop_refresh)

47
l4d2web/tests/test_cli.py Normal file
View file

@ -0,0 +1,47 @@
"""Tests for the l4d2web Flask CLI subcommands."""
from __future__ import annotations
from click.testing import CliRunner
import pytest
from sqlalchemy import select
from l4d2web.app import create_app
from l4d2web.cli import workshop_refresh
from l4d2web.db import init_db, session_scope
from l4d2web.models import Job
@pytest.fixture
def app_env(tmp_path, monkeypatch):
db_url = f"sqlite:///{tmp_path/'cli.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
return app
def test_workshop_refresh_enqueues_job(app_env):
runner = CliRunner()
with app_env.app_context():
result = runner.invoke(workshop_refresh, [])
assert result.exit_code == 0, result.output
assert "enqueued refresh_workshop_items job" in result.output
with session_scope() as db:
jobs = db.scalars(select(Job).where(Job.operation == "refresh_workshop_items")).all()
assert len(jobs) == 1
assert jobs[0].state == "queued"
assert jobs[0].user_id is None
assert jobs[0].server_id is None
def test_workshop_refresh_is_idempotent_when_job_queued(app_env):
runner = CliRunner()
with app_env.app_context():
first = runner.invoke(workshop_refresh, [])
second = runner.invoke(workshop_refresh, [])
assert first.exit_code == 0
assert second.exit_code == 0
assert "already queued" in second.output
with session_scope() as db:
jobs = db.scalars(select(Job).where(Job.operation == "refresh_workshop_items")).all()
assert len(jobs) == 1, "must not insert a second job when one is already queued"