from dataclasses import dataclass from datetime import UTC, datetime, timedelta from types import SimpleNamespace import subprocess import pytest from sqlalchemy import select from l4d2web.auth import hash_password from l4d2web.db import init_db, session_scope from l4d2web.models import Blueprint, Job, Server, User from l4d2web.services import l4d2_facade from l4d2web.services.host_commands import CommandCancelledError from l4d2web.services.job_worker import SchedulerState, can_start, recover_stale_jobs, run_worker_once @dataclass class DummyJob: operation: str server_id: int | None = None @pytest.fixture def worker_app(tmp_path, monkeypatch): from l4d2web.app import create_app db_url = f"sqlite:///{tmp_path/'worker.db'}" monkeypatch.setenv("DATABASE_URL", db_url) app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) init_db() return app @pytest.fixture def seeded_worker(worker_app): with session_scope() as session: user = User(username="alice", password_digest=hash_password("secret"), admin=False) session.add(user) session.flush() blueprint = Blueprint(user_id=user.id, name="default", arguments="[]", config="[]") session.add(blueprint) session.flush() server_one = Server( user_id=user.id, blueprint_id=blueprint.id, name="alpha", port=27015, last_error="old error", ) server_two = Server(user_id=user.id, blueprint_id=blueprint.id, name="bravo", port=27016) session.add_all([server_one, server_two]) session.flush() ids = SimpleNamespace(user=user.id, server_one=server_one.id, server_two=server_two.id) return worker_app, ids def add_job( user_id: int, operation: str, *, server_id: int | None, state: str = "queued", created_at: datetime | None = None, ) -> int: now = datetime.now(UTC) with session_scope() as session: job = Job( user_id=user_id, server_id=server_id, operation=operation, state=state, created_at=created_at or now, updated_at=created_at or now, ) if state == "running": job.started_at = now session.add(job) session.flush() return job.id def load_job(job_id: int) -> Job: with session_scope() as session: job = session.scalar(select(Job).where(Job.id == job_id)) assert job is not None return job def test_scheduler_predicates() -> None: state = SchedulerState() state.running_servers.add(1) assert can_start(DummyJob(operation="start", server_id=1), state) is False assert can_start(DummyJob(operation="start", server_id=2), state) is True assert can_start(DummyJob(operation="install", server_id=None), state) is False def test_run_worker_once_claims_oldest_runnable_job(seeded_worker, monkeypatch) -> None: app, ids = seeded_worker calls = [] older = datetime.now(UTC) - timedelta(minutes=2) newer = datetime.now(UTC) old_job_id = add_job(ids.user, "initialize", server_id=ids.server_one, created_at=older) new_job_id = add_job(ids.user, "initialize", server_id=ids.server_two, created_at=newer) monkeypatch.setattr(l4d2_facade, "initialize_server", lambda server_id, **kwargs: calls.append(server_id)) monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped")) with app.app_context(): assert run_worker_once() is True assert calls == [ids.server_one] assert load_job(old_job_id).state == "succeeded" assert load_job(new_job_id).state == "queued" def test_successful_start_job_logs_and_refreshes_server_state(seeded_worker, monkeypatch) -> None: app, ids = seeded_worker job_id = add_job(ids.user, "start", server_id=ids.server_one) calls = [] def fake_initialize(server_id, *, on_stdout=None, on_stderr=None, should_cancel=None): del should_cancel calls.append(("initialize", server_id)) on_stdout("Step: creating instance directories...") on_stderr("init warning") def fake_start(server_id, *, on_stdout=None, on_stderr=None, should_cancel=None): del should_cancel calls.append(("start", server_id)) on_stdout("Step: mounting runtime overlay...") monkeypatch.setattr(l4d2_facade, "initialize_server", fake_initialize) monkeypatch.setattr(l4d2_facade, "start_server", fake_start) monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="running")) with app.app_context(): assert run_worker_once() is True with session_scope() as session: job = session.scalar(select(Job).where(Job.id == job_id)) server = session.scalar(select(Server).where(Server.id == ids.server_one)) lines = [(row.seq, row.stream, row.line) for row in job_logs_for(session, job_id)] assert calls == [("initialize", ids.server_one), ("start", ids.server_one)] assert job is not None assert job.state == "succeeded" assert job.exit_code == 0 assert job.started_at is not None assert job.finished_at is not None assert job.updated_at is not None assert lines == [ (1, "stdout", "starting initialize for alpha"), (2, "stdout", "Step: creating instance directories..."), (3, "stderr", "init warning"), (4, "stdout", "finished initialize successfully"), (5, "stdout", "starting start for alpha"), (6, "stdout", "Step: mounting runtime overlay..."), (7, "stdout", "finished start successfully"), ] assert server is not None assert server.actual_state == "running" assert server.last_error == "" def job_logs_for(session, job_id: int): from l4d2web.models import JobLog return session.scalars(select(JobLog).where(JobLog.job_id == job_id).order_by(JobLog.seq)).all() def test_called_process_error_fails_job_and_sets_server_error(seeded_worker, monkeypatch) -> None: app, ids = seeded_worker job_id = add_job(ids.user, "stop", server_id=ids.server_one) def fail_stop(server_id, **kwargs): raise subprocess.CalledProcessError(returncode=7, cmd=["stop"], stderr="stop failed") monkeypatch.setattr(l4d2_facade, "stop_server", fail_stop) monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped")) with app.app_context(): assert run_worker_once() is True with session_scope() as session: job = session.scalar(select(Job).where(Job.id == job_id)) server = session.scalar(select(Server).where(Server.id == ids.server_one)) lines = [row.line for row in job_logs_for(session, job_id)] assert job is not None assert job.state == "failed" assert job.exit_code == 7 assert server is not None assert server.last_error == "stop failed" assert "starting stop for alpha" in lines assert "stop failed" in lines def test_refresh_failure_does_not_hide_operation_failure(seeded_worker, monkeypatch) -> None: app, ids = seeded_worker job_id = add_job(ids.user, "stop", server_id=ids.server_one) def fail_stop(server_id, **kwargs): raise subprocess.CalledProcessError(returncode=7, cmd=["stop"], stderr="stop failed") monkeypatch.setattr(l4d2_facade, "stop_server", fail_stop) monkeypatch.setattr(l4d2_facade, "server_status", lambda name: (_ for _ in ()).throw(RuntimeError("status down"))) with app.app_context(): assert run_worker_once() is True with session_scope() as session: job = session.scalar(select(Job).where(Job.id == job_id)) server = session.scalar(select(Server).where(Server.id == ids.server_one)) lines = [row.line for row in job_logs_for(session, job_id)] assert job is not None assert job.state == "failed" assert job.exit_code == 7 assert server is not None assert server.last_error == "stop failed" assert "starting stop for alpha" in lines assert "stop failed" in lines assert "status refresh failed: status down" in lines def test_unexpected_exception_fails_job_with_exit_code_one(seeded_worker, monkeypatch) -> None: app, ids = seeded_worker job_id = add_job(ids.user, "delete", server_id=ids.server_one) monkeypatch.setattr(l4d2_facade, "delete_server", lambda server_id, **kwargs: (_ for _ in ()).throw(RuntimeError("boom"))) monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="unknown")) with app.app_context(): assert run_worker_once() is True job = load_job(job_id) assert job.state == "failed" assert job.exit_code == 1 def test_same_server_jobs_do_not_overlap(seeded_worker, monkeypatch) -> None: app, ids = seeded_worker add_job(ids.user, "start", server_id=ids.server_one, state="running") queued_id = add_job(ids.user, "stop", server_id=ids.server_one) monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: pytest.fail("must not run")) with app.app_context(): assert run_worker_once() is False assert load_job(queued_id).state == "queued" def test_same_server_jobs_wait_while_job_is_cancelling(seeded_worker, monkeypatch) -> None: app, ids = seeded_worker add_job(ids.user, "start", server_id=ids.server_one, state="cancelling") queued_id = add_job(ids.user, "stop", server_id=ids.server_one) monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: pytest.fail("must not run")) with app.app_context(): assert run_worker_once() is False assert load_job(queued_id).state == "queued" def test_cancelled_process_finishes_job_as_cancelled(seeded_worker, monkeypatch) -> None: app, ids = seeded_worker job_id = add_job(ids.user, "stop", server_id=ids.server_one) def fake_stop(server_id, *, on_stdout=None, on_stderr=None, should_cancel=None): assert server_id == ids.server_one assert should_cancel is not None with session_scope() as session: job = session.scalar(select(Job).where(Job.id == job_id)) assert job is not None job.state = "cancelling" assert should_cancel() is True on_stderr("terminating") raise CommandCancelledError(returncode=-15, cmd=["stop"], output="", stderr="") monkeypatch.setattr(l4d2_facade, "stop_server", fake_stop) monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="unknown")) with app.app_context(): assert run_worker_once() is True with session_scope() as session: job = session.scalar(select(Job).where(Job.id == job_id)) server = session.scalar(select(Server).where(Server.id == ids.server_one)) lines = [row.line for row in job_logs_for(session, job_id)] assert job is not None assert job.state == "cancelled" assert job.exit_code == -15 assert job.finished_at is not None assert server is not None assert server.last_error == "job cancelled; runtime state may be partial" assert "starting stop for alpha" in lines assert "terminating" in lines assert "job cancelled; runtime state may be partial" in lines def test_different_server_jobs_can_be_claimed_while_other_server_runs(seeded_worker, monkeypatch) -> None: app, ids = seeded_worker add_job(ids.user, "start", server_id=ids.server_one, state="running") queued_id = add_job(ids.user, "stop", server_id=ids.server_two) monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: None) monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped")) with app.app_context(): assert run_worker_once() is True assert load_job(queued_id).state == "succeeded" def test_install_job_not_claimed_while_server_job_runs(seeded_worker) -> None: app, ids = seeded_worker add_job(ids.user, "start", server_id=ids.server_one, state="running") install_id = add_job(ids.user, "install", server_id=None) with app.app_context(): assert run_worker_once() is False assert load_job(install_id).state == "queued" def test_server_job_not_claimed_while_install_runs(seeded_worker) -> None: app, ids = seeded_worker add_job(ids.user, "install", server_id=None, state="running") queued_id = add_job(ids.user, "initialize", server_id=ids.server_one) with app.app_context(): assert run_worker_once() is False assert load_job(queued_id).state == "queued" def test_recover_stale_running_jobs(worker_app) -> None: with session_scope() as session: user = User(username="bob", password_digest=hash_password("secret"), admin=False) session.add(user) session.flush() job = Job(user_id=user.id, server_id=None, operation="install", state="running") session.add(job) session.flush() job_id = job.id with worker_app.app_context(): assert recover_stale_jobs() == 1 recovered = load_job(job_id) assert recovered.state == "failed" assert recovered.finished_at is not None def test_worker_startup_skipped_during_testing(monkeypatch, tmp_path) -> None: from l4d2web import app as app_module called = [] db_url = f"sqlite:///{tmp_path/'startup.db'}" monkeypatch.setattr(app_module, "start_job_workers", lambda app: called.append(app)) app_module.create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) assert called == [] def test_worker_startup_when_enabled_outside_testing(monkeypatch, tmp_path) -> None: from l4d2web import app as app_module called = [] db_url = f"sqlite:///{tmp_path/'startup-enabled.db'}" monkeypatch.setattr(app_module, "start_job_workers", lambda app: called.append(app)) app = app_module.create_app({"TESTING": False, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) assert called == [app]