left4me/l4d2web/tests/test_job_worker.py
mwiegand 92d6ebbe82
feat(l4d2-web): managed global map overlays with daily refresh
Adds two managed system overlays (l4d2center-maps, cedapug-maps) that
fetch curated map archives from upstream sources and reconcile addons
symlinks for non-Steam maps. A daily systemd timer enqueues a coalesced
refresh_global_overlays worker job; downloads, extraction, and rebuilds
run in the existing job worker and surface in the job log UI.

Schema: GlobalOverlaySource / GlobalOverlayItem / GlobalOverlayItemFile
plus nullable Job.user_id so system jobs render as "system" in the UI.
The new builder reconciles symlinks against the per-source vpk cache
and leaves foreign symlinks untouched. Initialize-time guard refuses
to mount a partial overlay if any expected vpk is missing from cache.

Refresh service uses shutil.move to handle EXDEV when /tmp and the
cache live on different filesystems.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 08:05:14 +02:00

747 lines
27 KiB
Python

from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from types import SimpleNamespace
import subprocess
import pytest
from sqlalchemy import select
from l4d2web.auth import hash_password
from l4d2web.db import init_db, session_scope
from l4d2web.models import (
Blueprint,
BlueprintOverlay,
Job,
Overlay,
OverlayWorkshopItem,
Server,
User,
WorkshopItem,
)
from l4d2web.services import l4d2_facade
from l4d2web.services.host_commands import CommandCancelledError
from l4d2web.services.job_worker import (
SchedulerState,
build_scheduler_state,
can_start,
enqueue_build_overlay,
recover_stale_jobs,
run_worker_once,
)
@dataclass
class DummyJob:
operation: str
server_id: int | None = None
overlay_id: int | None = None
@pytest.fixture
def worker_app(tmp_path, monkeypatch):
from l4d2web.app import create_app
db_url = f"sqlite:///{tmp_path/'worker.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
return app
@pytest.fixture
def seeded_worker(worker_app):
with session_scope() as session:
user = User(username="alice", password_digest=hash_password("secret"), admin=False)
session.add(user)
session.flush()
blueprint = Blueprint(user_id=user.id, name="default", arguments="[]", config="[]")
session.add(blueprint)
session.flush()
server_one = Server(
user_id=user.id,
blueprint_id=blueprint.id,
name="alpha",
port=27015,
last_error="old error",
)
server_two = Server(user_id=user.id, blueprint_id=blueprint.id, name="bravo", port=27016)
session.add_all([server_one, server_two])
session.flush()
ids = SimpleNamespace(user=user.id, server_one=server_one.id, server_two=server_two.id)
return worker_app, ids
def add_job(
user_id: int,
operation: str,
*,
server_id: int | None,
state: str = "queued",
created_at: datetime | None = None,
overlay_id: int | None = None,
) -> int:
now = datetime.now(UTC)
with session_scope() as session:
job = Job(
user_id=user_id,
server_id=server_id,
overlay_id=overlay_id,
operation=operation,
state=state,
created_at=created_at or now,
updated_at=created_at or now,
)
if state == "running":
job.started_at = now
session.add(job)
session.flush()
return job.id
def load_job(job_id: int) -> Job:
with session_scope() as session:
job = session.scalar(select(Job).where(Job.id == job_id))
assert job is not None
return job
def test_scheduler_predicates() -> None:
state = SchedulerState()
state.running_servers.add(1)
assert can_start(DummyJob(operation="start", server_id=1), state) is False
assert can_start(DummyJob(operation="start", server_id=2), state) is True
assert can_start(DummyJob(operation="install", server_id=None), state) is False
def test_run_worker_once_claims_oldest_runnable_job(seeded_worker, monkeypatch) -> None:
app, ids = seeded_worker
calls = []
older = datetime.now(UTC) - timedelta(minutes=2)
newer = datetime.now(UTC)
old_job_id = add_job(ids.user, "initialize", server_id=ids.server_one, created_at=older)
new_job_id = add_job(ids.user, "initialize", server_id=ids.server_two, created_at=newer)
monkeypatch.setattr(l4d2_facade, "initialize_server", lambda server_id, **kwargs: calls.append(server_id))
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped"))
with app.app_context():
assert run_worker_once() is True
assert calls == [ids.server_one]
assert load_job(old_job_id).state == "succeeded"
assert load_job(new_job_id).state == "queued"
def test_successful_start_job_logs_and_refreshes_server_state(seeded_worker, monkeypatch) -> None:
app, ids = seeded_worker
job_id = add_job(ids.user, "start", server_id=ids.server_one)
calls = []
def fake_initialize(server_id, *, on_stdout=None, on_stderr=None, should_cancel=None):
del should_cancel
calls.append(("initialize", server_id))
on_stdout("Step: creating instance directories...")
on_stderr("init warning")
def fake_start(server_id, *, on_stdout=None, on_stderr=None, should_cancel=None):
del should_cancel
calls.append(("start", server_id))
on_stdout("Step: mounting runtime overlay...")
monkeypatch.setattr(l4d2_facade, "initialize_server", fake_initialize)
monkeypatch.setattr(l4d2_facade, "start_server", fake_start)
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="running"))
with app.app_context():
assert run_worker_once() is True
with session_scope() as session:
job = session.scalar(select(Job).where(Job.id == job_id))
server = session.scalar(select(Server).where(Server.id == ids.server_one))
lines = [(row.seq, row.stream, row.line) for row in job_logs_for(session, job_id)]
assert calls == [("initialize", ids.server_one), ("start", ids.server_one)]
assert job is not None
assert job.state == "succeeded"
assert job.exit_code == 0
assert job.started_at is not None
assert job.finished_at is not None
assert job.updated_at is not None
assert lines == [
(1, "stdout", "starting initialize for alpha"),
(2, "stdout", "Step: creating instance directories..."),
(3, "stderr", "init warning"),
(4, "stdout", "finished initialize successfully"),
(5, "stdout", "starting start for alpha"),
(6, "stdout", "Step: mounting runtime overlay..."),
(7, "stdout", "finished start successfully"),
]
assert server is not None
assert server.actual_state == "running"
assert server.last_error == ""
def job_logs_for(session, job_id: int):
from l4d2web.models import JobLog
return session.scalars(select(JobLog).where(JobLog.job_id == job_id).order_by(JobLog.seq)).all()
def test_called_process_error_fails_job_and_sets_server_error(seeded_worker, monkeypatch) -> None:
app, ids = seeded_worker
job_id = add_job(ids.user, "stop", server_id=ids.server_one)
def fail_stop(server_id, **kwargs):
raise subprocess.CalledProcessError(returncode=7, cmd=["stop"], stderr="stop failed")
monkeypatch.setattr(l4d2_facade, "stop_server", fail_stop)
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped"))
with app.app_context():
assert run_worker_once() is True
with session_scope() as session:
job = session.scalar(select(Job).where(Job.id == job_id))
server = session.scalar(select(Server).where(Server.id == ids.server_one))
lines = [row.line for row in job_logs_for(session, job_id)]
assert job is not None
assert job.state == "failed"
assert job.exit_code == 7
assert server is not None
assert server.last_error == "stop failed"
assert "starting stop for alpha" in lines
assert "stop failed" in lines
def test_refresh_failure_does_not_hide_operation_failure(seeded_worker, monkeypatch) -> None:
app, ids = seeded_worker
job_id = add_job(ids.user, "stop", server_id=ids.server_one)
def fail_stop(server_id, **kwargs):
raise subprocess.CalledProcessError(returncode=7, cmd=["stop"], stderr="stop failed")
monkeypatch.setattr(l4d2_facade, "stop_server", fail_stop)
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: (_ for _ in ()).throw(RuntimeError("status down")))
with app.app_context():
assert run_worker_once() is True
with session_scope() as session:
job = session.scalar(select(Job).where(Job.id == job_id))
server = session.scalar(select(Server).where(Server.id == ids.server_one))
lines = [row.line for row in job_logs_for(session, job_id)]
assert job is not None
assert job.state == "failed"
assert job.exit_code == 7
assert server is not None
assert server.last_error == "stop failed"
assert "starting stop for alpha" in lines
assert "stop failed" in lines
assert "status refresh failed: status down" in lines
def test_unexpected_exception_fails_job_with_exit_code_one(seeded_worker, monkeypatch) -> None:
app, ids = seeded_worker
job_id = add_job(ids.user, "delete", server_id=ids.server_one)
monkeypatch.setattr(l4d2_facade, "delete_server", lambda server_id, **kwargs: (_ for _ in ()).throw(RuntimeError("boom")))
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="unknown"))
with app.app_context():
assert run_worker_once() is True
job = load_job(job_id)
assert job.state == "failed"
assert job.exit_code == 1
def test_same_server_jobs_do_not_overlap(seeded_worker, monkeypatch) -> None:
app, ids = seeded_worker
add_job(ids.user, "start", server_id=ids.server_one, state="running")
queued_id = add_job(ids.user, "stop", server_id=ids.server_one)
monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: pytest.fail("must not run"))
with app.app_context():
assert run_worker_once() is False
assert load_job(queued_id).state == "queued"
def test_same_server_jobs_wait_while_job_is_cancelling(seeded_worker, monkeypatch) -> None:
app, ids = seeded_worker
add_job(ids.user, "start", server_id=ids.server_one, state="cancelling")
queued_id = add_job(ids.user, "stop", server_id=ids.server_one)
monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: pytest.fail("must not run"))
with app.app_context():
assert run_worker_once() is False
assert load_job(queued_id).state == "queued"
def test_cancelled_process_finishes_job_as_cancelled(seeded_worker, monkeypatch) -> None:
app, ids = seeded_worker
job_id = add_job(ids.user, "stop", server_id=ids.server_one)
def fake_stop(server_id, *, on_stdout=None, on_stderr=None, should_cancel=None):
assert server_id == ids.server_one
assert should_cancel is not None
with session_scope() as session:
job = session.scalar(select(Job).where(Job.id == job_id))
assert job is not None
job.state = "cancelling"
assert should_cancel() is True
on_stderr("terminating")
raise CommandCancelledError(returncode=-15, cmd=["stop"], output="", stderr="")
monkeypatch.setattr(l4d2_facade, "stop_server", fake_stop)
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="unknown"))
with app.app_context():
assert run_worker_once() is True
with session_scope() as session:
job = session.scalar(select(Job).where(Job.id == job_id))
server = session.scalar(select(Server).where(Server.id == ids.server_one))
lines = [row.line for row in job_logs_for(session, job_id)]
assert job is not None
assert job.state == "cancelled"
assert job.exit_code == -15
assert job.finished_at is not None
assert server is not None
assert server.last_error == "job cancelled; runtime state may be partial"
assert "starting stop for alpha" in lines
assert "terminating" in lines
assert "job cancelled; runtime state may be partial" in lines
def test_different_server_jobs_can_be_claimed_while_other_server_runs(seeded_worker, monkeypatch) -> None:
app, ids = seeded_worker
add_job(ids.user, "start", server_id=ids.server_one, state="running")
queued_id = add_job(ids.user, "stop", server_id=ids.server_two)
monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: None)
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped"))
with app.app_context():
assert run_worker_once() is True
assert load_job(queued_id).state == "succeeded"
def test_install_job_not_claimed_while_server_job_runs(seeded_worker) -> None:
app, ids = seeded_worker
add_job(ids.user, "start", server_id=ids.server_one, state="running")
install_id = add_job(ids.user, "install", server_id=None)
with app.app_context():
assert run_worker_once() is False
assert load_job(install_id).state == "queued"
def test_server_job_not_claimed_while_install_runs(seeded_worker) -> None:
app, ids = seeded_worker
add_job(ids.user, "install", server_id=None, state="running")
queued_id = add_job(ids.user, "initialize", server_id=ids.server_one)
with app.app_context():
assert run_worker_once() is False
assert load_job(queued_id).state == "queued"
def test_recover_stale_running_jobs(worker_app) -> None:
with session_scope() as session:
user = User(username="bob", password_digest=hash_password("secret"), admin=False)
session.add(user)
session.flush()
job = Job(user_id=user.id, server_id=None, operation="install", state="running")
session.add(job)
session.flush()
job_id = job.id
with worker_app.app_context():
assert recover_stale_jobs() == 1
recovered = load_job(job_id)
assert recovered.state == "failed"
assert recovered.finished_at is not None
def test_worker_startup_skipped_during_testing(monkeypatch, tmp_path) -> None:
from l4d2web import app as app_module
called = []
db_url = f"sqlite:///{tmp_path/'startup.db'}"
monkeypatch.setattr(app_module, "start_job_workers", lambda app: called.append(app))
app_module.create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
assert called == []
def test_worker_startup_when_enabled_outside_testing(monkeypatch, tmp_path) -> None:
from l4d2web import app as app_module
called = []
db_url = f"sqlite:///{tmp_path/'startup-enabled.db'}"
monkeypatch.setattr(app_module, "start_job_workers", lambda app: called.append(app))
app = app_module.create_app({"TESTING": False, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
assert called == [app]
# ---------------------------------------------------------------------------
# Scheduler truth table for the new operations (build_overlay,
# refresh_workshop_items) and their interaction with existing ops.
# ---------------------------------------------------------------------------
def test_install_blocks_build_overlay_and_refresh() -> None:
state = SchedulerState(install_running=True)
assert can_start(DummyJob(operation="build_overlay", overlay_id=1), state) is False
assert can_start(DummyJob(operation="refresh_workshop_items"), state) is False
def test_refresh_blocks_install_build_overlay_and_servers() -> None:
state = SchedulerState(refresh_running=True)
assert can_start(DummyJob(operation="install"), state) is False
assert can_start(DummyJob(operation="build_overlay", overlay_id=1), state) is False
assert can_start(DummyJob(operation="start", server_id=1), state) is False
def test_build_overlay_blocks_same_overlay_only() -> None:
state = SchedulerState()
state.running_overlays.add(7)
assert can_start(DummyJob(operation="build_overlay", overlay_id=7), state) is False
assert can_start(DummyJob(operation="build_overlay", overlay_id=8), state) is True
def test_install_blocked_by_active_build_overlay() -> None:
state = SchedulerState()
state.running_overlays.add(7)
assert can_start(DummyJob(operation="install"), state) is False
def test_refresh_blocked_by_active_build_overlay() -> None:
state = SchedulerState()
state.running_overlays.add(7)
assert can_start(DummyJob(operation="refresh_workshop_items"), state) is False
def test_server_job_blocked_when_blueprint_overlay_is_building() -> None:
state = SchedulerState()
state.running_overlays.add(7)
state.blocked_servers_by_overlay.add(42)
assert can_start(DummyJob(operation="start", server_id=42), state) is False
# Other servers (whose blueprints don't reference overlay 7) are NOT blocked.
assert can_start(DummyJob(operation="start", server_id=43), state) is True
@pytest.fixture
def overlay_seeded_worker(seeded_worker):
app, ids = seeded_worker
with session_scope() as s:
overlay = Overlay(name="ws", path="9", type="workshop", user_id=ids.user)
s.add(overlay)
s.flush()
# Move server_two onto a different blueprint with NO workshop overlay,
# so the test can distinguish "blocked by overlay build" from "any
# server is blocked".
bp_with_overlay = s.scalar(select(Blueprint).where(Blueprint.user_id == ids.user))
s.add(BlueprintOverlay(blueprint_id=bp_with_overlay.id, overlay_id=overlay.id, position=0))
bp_without = Blueprint(user_id=ids.user, name="no-overlay", arguments="[]", config="[]")
s.add(bp_without)
s.flush()
server_two = s.scalar(select(Server).where(Server.id == ids.server_two))
server_two.blueprint_id = bp_without.id
ids.overlay = overlay.id
return app, ids
def test_scheduler_state_finds_servers_blocked_by_running_build(overlay_seeded_worker) -> None:
app, ids = overlay_seeded_worker
add_job(ids.user, "build_overlay", server_id=None, state="running", overlay_id=ids.overlay)
with session_scope() as s:
state = build_scheduler_state(s)
assert ids.overlay in state.running_overlays
assert ids.server_one in state.blocked_servers_by_overlay
assert ids.server_two not in state.blocked_servers_by_overlay
def test_enqueue_build_overlay_creates_new_job_when_none_pending(overlay_seeded_worker) -> None:
app, ids = overlay_seeded_worker
with session_scope() as s:
job = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
assert job.operation == "build_overlay"
assert job.overlay_id == ids.overlay
assert job.server_id is None
assert job.state == "queued"
def test_enqueue_build_overlay_coalesces_against_pending(overlay_seeded_worker) -> None:
app, ids = overlay_seeded_worker
with session_scope() as s:
first = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
first_id = first.id
with session_scope() as s:
second = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
assert second.id == first_id, "should coalesce against the pending job"
with session_scope() as s:
n = s.query(Job).filter_by(operation="build_overlay", overlay_id=ids.overlay).count()
assert n == 1
def test_enqueue_build_overlay_does_not_coalesce_against_running(overlay_seeded_worker) -> None:
app, ids = overlay_seeded_worker
add_job(ids.user, "build_overlay", server_id=None, state="running", overlay_id=ids.overlay)
with session_scope() as s:
new_job = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
assert new_job.state == "queued"
with session_scope() as s:
running = s.scalars(
select(Job).where(
Job.operation == "build_overlay",
Job.overlay_id == ids.overlay,
Job.state == "running",
)
).all()
queued = s.scalars(
select(Job).where(
Job.operation == "build_overlay",
Job.overlay_id == ids.overlay,
Job.state == "queued",
)
).all()
assert len(running) == 1
assert len(queued) == 1
def test_run_worker_once_dispatches_build_overlay(overlay_seeded_worker, monkeypatch, tmp_path) -> None:
app, ids = overlay_seeded_worker
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
with session_scope() as s:
wi = WorkshopItem(steam_id="1001", title="A", filename="a.vpk", file_url="u", file_size=3, time_updated=1)
s.add(wi)
s.flush()
s.add(OverlayWorkshopItem(overlay_id=ids.overlay, workshop_item_id=wi.id))
cache = tmp_path / "workshop_cache"
cache.mkdir()
(cache / "1001.vpk").write_bytes(b"abc")
import os
os.utime(cache / "1001.vpk", (1, 1))
# Mark item as downloaded.
with session_scope() as s:
wi = s.query(WorkshopItem).filter_by(steam_id="1001").one()
wi.last_downloaded_at = datetime.now(UTC)
job_id = add_job(ids.user, "build_overlay", server_id=None, overlay_id=ids.overlay)
with app.app_context():
assert run_worker_once() is True
job = load_job(job_id)
assert job.state == "succeeded", job.exit_code
addons = tmp_path / "overlays" / "9" / "left4dead2" / "addons"
assert (addons / "1001.vpk").is_symlink()
def test_run_worker_once_dispatches_refresh(overlay_seeded_worker, monkeypatch, tmp_path) -> None:
app, ids = overlay_seeded_worker
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
from l4d2web.services import steam_workshop, job_worker
refresh_calls = []
def fake_refresh_workshop_items(*, on_stdout, on_stderr, should_cancel):
refresh_calls.append(True)
on_stdout("refresh phase complete (downloaded=0 errors=0)")
return [] # no overlays affected
monkeypatch.setattr(job_worker, "_run_refresh_workshop_items", fake_refresh_workshop_items)
job_id = add_job(ids.user, "refresh_workshop_items", server_id=None)
with app.app_context():
assert run_worker_once() is True
assert refresh_calls == [True]
job = load_job(job_id)
assert job.state == "succeeded"
def test_refresh_downloads_serially_to_keep_web_worker_responsive(
overlay_seeded_worker, monkeypatch, tmp_path
) -> None:
app, ids = overlay_seeded_worker
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
from l4d2web.services import job_worker, steam_workshop
with session_scope() as s:
wi = WorkshopItem(
steam_id="1001",
title="A",
filename="old.vpk",
file_url="https://example.com/old.vpk",
file_size=1,
time_updated=1,
)
s.add(wi)
s.flush()
s.add(OverlayWorkshopItem(overlay_id=ids.overlay, workshop_item_id=wi.id))
meta = steam_workshop.WorkshopMetadata(
steam_id="1001",
title="A",
filename="new.vpk",
file_url="https://example.com/new.vpk",
file_size=2,
time_updated=2,
preview_url="",
consumer_app_id=550,
result=1,
)
monkeypatch.setattr(steam_workshop, "fetch_metadata_batch", lambda steam_ids, *, mode: [meta])
refresh_calls = []
def fake_refresh_all(metas, cache_root, *, executor_workers=8, should_cancel=None):
refresh_calls.append((list(metas), executor_workers, should_cancel is not None))
return steam_workshop.RefreshReport(downloaded=1, errors=0)
monkeypatch.setattr(steam_workshop, "refresh_all", fake_refresh_all)
with app.app_context():
affected = job_worker._run_refresh_workshop_items(
on_stdout=lambda _line: None,
on_stderr=lambda _line: None,
should_cancel=lambda: False,
)
assert affected == [ids.overlay]
assert refresh_calls == [([meta], 1, True)]
def test_refresh_job_enqueues_build_overlay_without_locking_its_final_log(
overlay_seeded_worker, monkeypatch, tmp_path
) -> None:
app, ids = overlay_seeded_worker
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
from l4d2web.services import steam_workshop
with session_scope() as s:
wi = WorkshopItem(
steam_id="1001",
title="A",
filename="old.vpk",
file_url="https://example.com/old.vpk",
file_size=1,
time_updated=1,
)
s.add(wi)
s.flush()
s.add(OverlayWorkshopItem(overlay_id=ids.overlay, workshop_item_id=wi.id))
meta = steam_workshop.WorkshopMetadata(
steam_id="1001",
title="A",
filename="new.vpk",
file_url="https://example.com/new.vpk",
file_size=2,
time_updated=2,
preview_url="",
consumer_app_id=550,
result=1,
)
monkeypatch.setattr(steam_workshop, "fetch_metadata_batch", lambda steam_ids, *, mode: [meta])
monkeypatch.setattr(
steam_workshop,
"refresh_all",
lambda metas, cache_root, **kwargs: steam_workshop.RefreshReport(downloaded=1, errors=0),
)
job_id = add_job(ids.user, "refresh_workshop_items", server_id=None)
with app.app_context():
assert run_worker_once() is True
with session_scope() as s:
job = s.scalar(select(Job).where(Job.id == job_id))
build_job = s.scalar(
select(Job).where(
Job.operation == "build_overlay",
Job.overlay_id == ids.overlay,
Job.state == "queued",
)
)
lines = [row.line for row in job_logs_for(s, job_id)]
assert job is not None
assert job.state == "succeeded"
assert build_job is not None
assert "enqueued build_overlay for 1 overlay(s)" in lines
def test_refresh_global_overlays_blocks_install_build_refresh_and_servers() -> None:
from l4d2web.services.job_worker import SchedulerState, can_start
state = SchedulerState(refresh_global_overlays_running=True)
assert can_start(DummyJob(operation="install"), state) is False
assert can_start(DummyJob(operation="refresh_workshop_items"), state) is False
assert can_start(DummyJob(operation="build_overlay", overlay_id=1), state) is False
assert can_start(DummyJob(operation="start", server_id=1), state) is False
def test_refresh_global_overlays_waits_for_active_work() -> None:
from l4d2web.services.job_worker import SchedulerState, can_start
assert can_start(DummyJob(operation="refresh_global_overlays"), SchedulerState(install_running=True)) is False
assert can_start(DummyJob(operation="refresh_global_overlays"), SchedulerState(refresh_running=True)) is False
state = SchedulerState()
state.running_overlays.add(1)
assert can_start(DummyJob(operation="refresh_global_overlays"), state) is False
state = SchedulerState()
state.running_servers.add(1)
assert can_start(DummyJob(operation="refresh_global_overlays"), state) is False
def test_run_worker_once_dispatches_refresh_global_overlays(seeded_worker, monkeypatch):
from l4d2web.services import job_worker
from l4d2web.models import Job
from l4d2web.db import session_scope
called = []
def fake_refresh(*, on_stdout, on_stderr, should_cancel):
called.append("refresh")
on_stdout("global refresh complete")
return ["l4d2center-maps"]
monkeypatch.setattr(job_worker, "_run_refresh_global_overlays", fake_refresh)
with session_scope() as db:
job = Job(user_id=None, server_id=None, operation="refresh_global_overlays", state="queued")
db.add(job)
app, ids = seeded_worker
assert job_worker.run_worker_once() is True
assert called == ["refresh"]