from dataclasses import dataclass import pytest from l4d2web.services.job_worker import SchedulerState, can_start, recover_stale_jobs @dataclass class DummyJob: operation: str server_id: int | None = None @pytest.fixture def worker_fixture(tmp_path, monkeypatch): from l4d2web.app import create_app from l4d2web.db import init_db db_url = f"sqlite:///{tmp_path/'worker.db'}" monkeypatch.setenv("DATABASE_URL", db_url) app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) init_db() class WorkerFixture: def run_once(self): state = SchedulerState() state.running_servers.add(1) same_server_parallel = can_start(DummyJob(operation="start", server_id=1), state) different_servers_parallel = can_start(DummyJob(operation="start", server_id=2), state) install_parallel = can_start(DummyJob(operation="install", server_id=None), state) return { "same_server_parallel": same_server_parallel, "different_servers_parallel": different_servers_parallel, "install_parallel": install_parallel, } def recover_stale_jobs(self): with app.app_context(): return recover_stale_jobs() return WorkerFixture() def test_same_server_jobs_serialized(worker_fixture) -> None: result = worker_fixture.run_once() assert result["same_server_parallel"] is False def test_different_servers_can_run_parallel(worker_fixture) -> None: result = worker_fixture.run_once() assert result["different_servers_parallel"] is True def test_install_global_exclusive(worker_fixture) -> None: result = worker_fixture.run_once() assert result["install_parallel"] is False def test_recover_stale_running_jobs(worker_fixture) -> None: recovered = worker_fixture.recover_stale_jobs() assert recovered >= 0