feat(l4d2-web): periodic state poller refreshes Server.actual_state
A background thread spawned alongside the job workers polls every server's status every STATE_POLLER_INTERVAL_SECONDS (default 30) and writes the result via the existing refresh_server_actual_state path. Servers with in-flight jobs (queued/running/cancelling) are skipped to avoid racing the post-job refresh. Catches reboot drift, OOM kills, manual systemctl operations, and any other out-of-band state change. Spec: docs/superpowers/specs/2026-05-09-l4d2-server-lifecycle-reboot-and-drift-design.md Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
8552c559d3
commit
67b5521eb6
4 changed files with 137 additions and 1 deletions
|
|
@ -18,7 +18,11 @@ from l4d2web.routes.overlay_routes import bp as overlay_bp
|
||||||
from l4d2web.routes.page_routes import bp as page_bp
|
from l4d2web.routes.page_routes import bp as page_bp
|
||||||
from l4d2web.routes.server_routes import bp as server_bp
|
from l4d2web.routes.server_routes import bp as server_bp
|
||||||
from l4d2web.routes.workshop_routes import bp as workshop_bp
|
from l4d2web.routes.workshop_routes import bp as workshop_bp
|
||||||
from l4d2web.services.job_worker import recover_stale_jobs, start_job_workers
|
from l4d2web.services.job_worker import (
|
||||||
|
recover_stale_jobs,
|
||||||
|
start_job_workers,
|
||||||
|
start_state_poller,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _in_flask_cli_context() -> bool:
|
def _in_flask_cli_context() -> bool:
|
||||||
|
|
@ -89,6 +93,7 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
|
||||||
if should_start_workers:
|
if should_start_workers:
|
||||||
recover_stale_jobs()
|
recover_stale_jobs()
|
||||||
start_job_workers(app)
|
start_job_workers(app)
|
||||||
|
start_state_poller(app)
|
||||||
|
|
||||||
@app.get("/health")
|
@app.get("/health")
|
||||||
def health():
|
def health():
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ DEFAULT_CONFIG: dict[str, object] = {
|
||||||
"JOB_WORKER_THREADS": 4,
|
"JOB_WORKER_THREADS": 4,
|
||||||
"JOB_WORKER_ENABLED": True,
|
"JOB_WORKER_ENABLED": True,
|
||||||
"JOB_WORKER_POLL_SECONDS": 1,
|
"JOB_WORKER_POLL_SECONDS": 1,
|
||||||
|
"STATE_POLLER_INTERVAL_SECONDS": 30,
|
||||||
"JOB_LOG_REPLAY_LIMIT": 2000,
|
"JOB_LOG_REPLAY_LIMIT": 2000,
|
||||||
"JOB_LOG_LINE_MAX_CHARS": 4096,
|
"JOB_LOG_LINE_MAX_CHARS": 4096,
|
||||||
"PORT_RANGE_START": 27015,
|
"PORT_RANGE_START": 27015,
|
||||||
|
|
@ -27,6 +28,7 @@ def load_config() -> dict[str, object]:
|
||||||
"JOB_WORKER_THREADS": int(os.getenv("JOB_WORKER_THREADS", "4")),
|
"JOB_WORKER_THREADS": int(os.getenv("JOB_WORKER_THREADS", "4")),
|
||||||
"JOB_WORKER_ENABLED": _bool_from_env(os.getenv("JOB_WORKER_ENABLED", "true")),
|
"JOB_WORKER_ENABLED": _bool_from_env(os.getenv("JOB_WORKER_ENABLED", "true")),
|
||||||
"JOB_WORKER_POLL_SECONDS": float(os.getenv("JOB_WORKER_POLL_SECONDS", "1")),
|
"JOB_WORKER_POLL_SECONDS": float(os.getenv("JOB_WORKER_POLL_SECONDS", "1")),
|
||||||
|
"STATE_POLLER_INTERVAL_SECONDS": float(os.getenv("STATE_POLLER_INTERVAL_SECONDS", "30")),
|
||||||
"JOB_LOG_REPLAY_LIMIT": int(os.getenv("JOB_LOG_REPLAY_LIMIT", "2000")),
|
"JOB_LOG_REPLAY_LIMIT": int(os.getenv("JOB_LOG_REPLAY_LIMIT", "2000")),
|
||||||
"JOB_LOG_LINE_MAX_CHARS": int(os.getenv("JOB_LOG_LINE_MAX_CHARS", "4096")),
|
"JOB_LOG_LINE_MAX_CHARS": int(os.getenv("JOB_LOG_LINE_MAX_CHARS", "4096")),
|
||||||
"PORT_RANGE_START": int(os.getenv("LEFT4ME_PORT_RANGE_START", "27015")),
|
"PORT_RANGE_START": int(os.getenv("LEFT4ME_PORT_RANGE_START", "27015")),
|
||||||
|
|
|
||||||
|
|
@ -614,3 +614,45 @@ def worker_loop(app, poll_seconds: float) -> None:
|
||||||
ran_job = False
|
ran_job = False
|
||||||
if not ran_job:
|
if not ran_job:
|
||||||
time.sleep(poll_seconds)
|
time.sleep(poll_seconds)
|
||||||
|
|
||||||
|
|
||||||
|
def start_state_poller(app) -> None:
|
||||||
|
interval = float(app.config.get("STATE_POLLER_INTERVAL_SECONDS", 30))
|
||||||
|
thread = threading.Thread(
|
||||||
|
target=state_poller_loop,
|
||||||
|
args=(app, interval),
|
||||||
|
name="left4me-state-poller",
|
||||||
|
daemon=True,
|
||||||
|
)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
|
||||||
|
def state_poller_loop(app, interval: float) -> None:
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
with app.app_context():
|
||||||
|
poll_all_servers()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
time.sleep(interval)
|
||||||
|
|
||||||
|
|
||||||
|
def poll_all_servers() -> None:
|
||||||
|
with session_scope() as db:
|
||||||
|
active_server_ids = set(
|
||||||
|
db.scalars(
|
||||||
|
select(Job.server_id).where(
|
||||||
|
Job.state.in_(("queued", "running", "cancelling"))
|
||||||
|
)
|
||||||
|
).all()
|
||||||
|
)
|
||||||
|
server_ids = [
|
||||||
|
sid
|
||||||
|
for sid in db.scalars(select(Server.id)).all()
|
||||||
|
if sid not in active_server_ids
|
||||||
|
]
|
||||||
|
for sid in server_ids:
|
||||||
|
try:
|
||||||
|
refresh_server_actual_state(sid)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
|
||||||
|
|
@ -843,3 +843,90 @@ def test_build_overlay_script_type_blocks_per_overlay(overlay_seeded_worker) ->
|
||||||
can_start(DummyJob(operation="build_overlay", overlay_id=ids.overlay + 1), state)
|
can_start(DummyJob(operation="build_overlay", overlay_id=ids.overlay + 1), state)
|
||||||
is True
|
is True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# State poller tests — refresh Server.actual_state out-of-band so OOM kills,
|
||||||
|
# manual systemctl ops, and reboots no longer leave the DB on stale "running".
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_poller_refreshes_each_server(seeded_worker, monkeypatch) -> None:
|
||||||
|
from l4d2web.services import job_worker as jw
|
||||||
|
|
||||||
|
worker_app, ids = seeded_worker
|
||||||
|
|
||||||
|
refreshed: list[int] = []
|
||||||
|
monkeypatch.setattr(
|
||||||
|
jw, "refresh_server_actual_state", lambda sid: refreshed.append(sid)
|
||||||
|
)
|
||||||
|
|
||||||
|
with worker_app.app_context():
|
||||||
|
jw.poll_all_servers()
|
||||||
|
|
||||||
|
assert sorted(refreshed) == sorted([ids.server_one, ids.server_two])
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_poller_skips_servers_with_inflight_jobs(seeded_worker, monkeypatch) -> None:
|
||||||
|
from l4d2web.services import job_worker as jw
|
||||||
|
|
||||||
|
worker_app, ids = seeded_worker
|
||||||
|
|
||||||
|
add_job(ids.user, "stop", server_id=ids.server_one, state="running")
|
||||||
|
|
||||||
|
refreshed: list[int] = []
|
||||||
|
monkeypatch.setattr(
|
||||||
|
jw, "refresh_server_actual_state", lambda sid: refreshed.append(sid)
|
||||||
|
)
|
||||||
|
|
||||||
|
with worker_app.app_context():
|
||||||
|
jw.poll_all_servers()
|
||||||
|
|
||||||
|
assert ids.server_one not in refreshed
|
||||||
|
assert ids.server_two in refreshed
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_poller_swallows_per_server_exceptions(seeded_worker, monkeypatch) -> None:
|
||||||
|
from l4d2web.services import job_worker as jw
|
||||||
|
|
||||||
|
worker_app, ids = seeded_worker
|
||||||
|
|
||||||
|
refreshed: list[int] = []
|
||||||
|
|
||||||
|
def fake_refresh(sid: int) -> None:
|
||||||
|
if sid == ids.server_one:
|
||||||
|
raise RuntimeError("simulated host failure")
|
||||||
|
refreshed.append(sid)
|
||||||
|
|
||||||
|
monkeypatch.setattr(jw, "refresh_server_actual_state", fake_refresh)
|
||||||
|
|
||||||
|
with worker_app.app_context():
|
||||||
|
jw.poll_all_servers() # must not raise
|
||||||
|
|
||||||
|
assert refreshed == [ids.server_two]
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_poller_not_started_during_testing(monkeypatch, tmp_path) -> None:
|
||||||
|
from l4d2web import app as app_module
|
||||||
|
|
||||||
|
called: list = []
|
||||||
|
db_url = f"sqlite:///{tmp_path/'poller-testing.db'}"
|
||||||
|
monkeypatch.setattr(app_module, "start_state_poller", lambda app: called.append(app))
|
||||||
|
|
||||||
|
app_module.create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||||
|
|
||||||
|
assert called == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_poller_started_when_workers_enabled_outside_testing(monkeypatch, tmp_path) -> None:
|
||||||
|
from l4d2web import app as app_module
|
||||||
|
|
||||||
|
called: list = []
|
||||||
|
db_url = f"sqlite:///{tmp_path/'poller-enabled.db'}"
|
||||||
|
monkeypatch.setattr(app_module, "start_state_poller", lambda app: called.append(app))
|
||||||
|
monkeypatch.setattr(app_module, "start_job_workers", lambda app: None)
|
||||||
|
monkeypatch.setattr(app_module, "recover_stale_jobs", lambda: None)
|
||||||
|
|
||||||
|
app = app_module.create_app({"TESTING": False, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||||
|
|
||||||
|
assert called == [app]
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue