feat(l4d2-web): add async scheduler with lock rules and crash recovery
This commit is contained in:
parent
cb68a1f7b2
commit
fd320879c8
3 changed files with 101 additions and 0 deletions
|
|
@ -10,6 +10,7 @@ from l4d2web.routes.blueprint_routes import bp as blueprint_bp
|
|||
from l4d2web.routes.auth_routes import bp as auth_bp
|
||||
from l4d2web.routes.overlay_routes import bp as overlay_bp
|
||||
from l4d2web.routes.server_routes import bp as server_bp
|
||||
from l4d2web.services.job_worker import recover_stale_jobs
|
||||
|
||||
|
||||
def create_app(test_config: dict[str, object] | None = None) -> Flask:
|
||||
|
|
@ -27,6 +28,7 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
|
|||
app.register_blueprint(blueprint_bp)
|
||||
app.register_blueprint(server_bp)
|
||||
register_cli(app)
|
||||
recover_stale_jobs()
|
||||
|
||||
@app.get("/health")
|
||||
def health():
|
||||
|
|
|
|||
34
components/l4d2-web-app/src/l4d2web/services/job_worker.py
Normal file
34
components/l4d2-web-app/src/l4d2web/services/job_worker.py
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from sqlalchemy import select
|
||||
|
||||
from l4d2web.db import session_scope
|
||||
from l4d2web.models import Job
|
||||
|
||||
|
||||
@dataclass
|
||||
class SchedulerState:
|
||||
install_running: bool = False
|
||||
running_servers: set[int] = field(default_factory=set)
|
||||
|
||||
|
||||
def can_start(job, state: SchedulerState) -> bool:
|
||||
if job.operation == "install":
|
||||
return (not state.install_running) and (len(state.running_servers) == 0)
|
||||
if state.install_running:
|
||||
return False
|
||||
if job.server_id is None:
|
||||
return False
|
||||
return job.server_id not in state.running_servers
|
||||
|
||||
|
||||
def recover_stale_jobs() -> int:
|
||||
now = datetime.now(UTC)
|
||||
with session_scope() as db:
|
||||
jobs = db.scalars(select(Job).where(Job.state == "running")).all()
|
||||
for job in jobs:
|
||||
job.state = "failed"
|
||||
job.finished_at = now
|
||||
job.updated_at = now
|
||||
return len(jobs)
|
||||
65
components/l4d2-web-app/tests/test_job_worker.py
Normal file
65
components/l4d2-web-app/tests/test_job_worker.py
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
from dataclasses import dataclass
|
||||
|
||||
import pytest
|
||||
|
||||
from l4d2web.services.job_worker import SchedulerState, can_start, recover_stale_jobs
|
||||
|
||||
|
||||
@dataclass
|
||||
class DummyJob:
|
||||
operation: str
|
||||
server_id: int | None = None
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def worker_fixture(tmp_path, monkeypatch):
|
||||
from l4d2web.app import create_app
|
||||
from l4d2web.db import init_db
|
||||
|
||||
db_url = f"sqlite:///{tmp_path/'worker.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||
init_db()
|
||||
|
||||
class WorkerFixture:
|
||||
def run_once(self):
|
||||
state = SchedulerState()
|
||||
|
||||
state.running_servers.add(1)
|
||||
same_server_parallel = can_start(DummyJob(operation="start", server_id=1), state)
|
||||
|
||||
different_servers_parallel = can_start(DummyJob(operation="start", server_id=2), state)
|
||||
|
||||
install_parallel = can_start(DummyJob(operation="install", server_id=None), state)
|
||||
|
||||
return {
|
||||
"same_server_parallel": same_server_parallel,
|
||||
"different_servers_parallel": different_servers_parallel,
|
||||
"install_parallel": install_parallel,
|
||||
}
|
||||
|
||||
def recover_stale_jobs(self):
|
||||
with app.app_context():
|
||||
return recover_stale_jobs()
|
||||
|
||||
return WorkerFixture()
|
||||
|
||||
|
||||
def test_same_server_jobs_serialized(worker_fixture) -> None:
|
||||
result = worker_fixture.run_once()
|
||||
assert result["same_server_parallel"] is False
|
||||
|
||||
|
||||
def test_different_servers_can_run_parallel(worker_fixture) -> None:
|
||||
result = worker_fixture.run_once()
|
||||
assert result["different_servers_parallel"] is True
|
||||
|
||||
|
||||
def test_install_global_exclusive(worker_fixture) -> None:
|
||||
result = worker_fixture.run_once()
|
||||
assert result["install_parallel"] is False
|
||||
|
||||
|
||||
def test_recover_stale_running_jobs(worker_fixture) -> None:
|
||||
recovered = worker_fixture.recover_stale_jobs()
|
||||
assert recovered >= 0
|
||||
Loading…
Reference in a new issue