diff --git a/components/l4d2-web-app/src/l4d2web/app.py b/components/l4d2-web-app/src/l4d2web/app.py index a6ddc4f..4919230 100644 --- a/components/l4d2-web-app/src/l4d2web/app.py +++ b/components/l4d2-web-app/src/l4d2web/app.py @@ -10,6 +10,7 @@ from l4d2web.routes.blueprint_routes import bp as blueprint_bp from l4d2web.routes.auth_routes import bp as auth_bp from l4d2web.routes.overlay_routes import bp as overlay_bp from l4d2web.routes.server_routes import bp as server_bp +from l4d2web.services.job_worker import recover_stale_jobs def create_app(test_config: dict[str, object] | None = None) -> Flask: @@ -27,6 +28,7 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask: app.register_blueprint(blueprint_bp) app.register_blueprint(server_bp) register_cli(app) + recover_stale_jobs() @app.get("/health") def health(): diff --git a/components/l4d2-web-app/src/l4d2web/services/job_worker.py b/components/l4d2-web-app/src/l4d2web/services/job_worker.py new file mode 100644 index 0000000..be6a7ff --- /dev/null +++ b/components/l4d2-web-app/src/l4d2web/services/job_worker.py @@ -0,0 +1,34 @@ +from dataclasses import dataclass, field +from datetime import UTC, datetime + +from sqlalchemy import select + +from l4d2web.db import session_scope +from l4d2web.models import Job + + +@dataclass +class SchedulerState: + install_running: bool = False + running_servers: set[int] = field(default_factory=set) + + +def can_start(job, state: SchedulerState) -> bool: + if job.operation == "install": + return (not state.install_running) and (len(state.running_servers) == 0) + if state.install_running: + return False + if job.server_id is None: + return False + return job.server_id not in state.running_servers + + +def recover_stale_jobs() -> int: + now = datetime.now(UTC) + with session_scope() as db: + jobs = db.scalars(select(Job).where(Job.state == "running")).all() + for job in jobs: + job.state = "failed" + job.finished_at = now + job.updated_at = now + return len(jobs) diff --git a/components/l4d2-web-app/tests/test_job_worker.py b/components/l4d2-web-app/tests/test_job_worker.py new file mode 100644 index 0000000..f4f8b67 --- /dev/null +++ b/components/l4d2-web-app/tests/test_job_worker.py @@ -0,0 +1,65 @@ +from dataclasses import dataclass + +import pytest + +from l4d2web.services.job_worker import SchedulerState, can_start, recover_stale_jobs + + +@dataclass +class DummyJob: + operation: str + server_id: int | None = None + + +@pytest.fixture +def worker_fixture(tmp_path, monkeypatch): + from l4d2web.app import create_app + from l4d2web.db import init_db + + db_url = f"sqlite:///{tmp_path/'worker.db'}" + monkeypatch.setenv("DATABASE_URL", db_url) + app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) + init_db() + + class WorkerFixture: + def run_once(self): + state = SchedulerState() + + state.running_servers.add(1) + same_server_parallel = can_start(DummyJob(operation="start", server_id=1), state) + + different_servers_parallel = can_start(DummyJob(operation="start", server_id=2), state) + + install_parallel = can_start(DummyJob(operation="install", server_id=None), state) + + return { + "same_server_parallel": same_server_parallel, + "different_servers_parallel": different_servers_parallel, + "install_parallel": install_parallel, + } + + def recover_stale_jobs(self): + with app.app_context(): + return recover_stale_jobs() + + return WorkerFixture() + + +def test_same_server_jobs_serialized(worker_fixture) -> None: + result = worker_fixture.run_once() + assert result["same_server_parallel"] is False + + +def test_different_servers_can_run_parallel(worker_fixture) -> None: + result = worker_fixture.run_once() + assert result["different_servers_parallel"] is True + + +def test_install_global_exclusive(worker_fixture) -> None: + result = worker_fixture.run_once() + assert result["install_parallel"] is False + + +def test_recover_stale_running_jobs(worker_fixture) -> None: + recovered = worker_fixture.recover_stale_jobs() + assert recovered >= 0