From 0dc61d5de479fa8f8c5dec4c50682b6124ebc693 Mon Sep 17 00:00:00 2001 From: mwiegand Date: Tue, 12 May 2026 22:10:55 +0200 Subject: [PATCH] feat(live-state): start daemon poller, prune history, close stuck sessions Co-Authored-By: Claude Sonnet 4.6 --- l4d2web/app.py | 3 + l4d2web/config.py | 16 ++++++ l4d2web/services/live_state_poller.py | 73 ++++++++++++++++++++++++- l4d2web/tests/test_live_state_poller.py | 73 +++++++++++++++++++++++++ 4 files changed, 164 insertions(+), 1 deletion(-) diff --git a/l4d2web/app.py b/l4d2web/app.py index f952c70..120ec26 100644 --- a/l4d2web/app.py +++ b/l4d2web/app.py @@ -25,6 +25,7 @@ from l4d2web.services.job_worker import ( start_job_workers, start_state_poller, ) +from l4d2web.services.live_state_poller import start_live_state_poller def _in_flask_cli_context() -> bool: @@ -98,6 +99,8 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask: recover_stale_jobs() start_job_workers(app) start_state_poller(app) + if not app.config.get("TESTING"): + start_live_state_poller(app) @app.get("/health") def health(): diff --git a/l4d2web/config.py b/l4d2web/config.py index bef5da8..c046ac0 100644 --- a/l4d2web/config.py +++ b/l4d2web/config.py @@ -13,6 +13,14 @@ DEFAULT_CONFIG: dict[str, object] = { "JOB_LOG_LINE_MAX_CHARS": 4096, "PORT_RANGE_START": 27015, "PORT_RANGE_END": 27115, + "LIVE_STATE_POLL_SECONDS": 5, + "LIVE_STATE_QUERY_TIMEOUT_SECONDS": 2.0, + "LIVE_STATE_STALE_SECONDS": 30, + "LIVE_STATE_HISTORY_DAYS": 30, + "LIVE_STATE_RETENTION_EVERY_TICKS": 60, + "STUCK_SESSION_SECONDS": 60, + "STEAM_PROFILE_TTL_SECONDS": 86400, + "STEAM_WEB_API_KEY": "", } @@ -33,4 +41,12 @@ def load_config() -> dict[str, object]: "JOB_LOG_LINE_MAX_CHARS": int(os.getenv("JOB_LOG_LINE_MAX_CHARS", "4096")), "PORT_RANGE_START": int(os.getenv("LEFT4ME_PORT_RANGE_START", "27015")), "PORT_RANGE_END": int(os.getenv("LEFT4ME_PORT_RANGE_END", "27115")), + "LIVE_STATE_POLL_SECONDS": float(os.getenv("LIVE_STATE_POLL_SECONDS", "5")), + "LIVE_STATE_QUERY_TIMEOUT_SECONDS": float(os.getenv("LIVE_STATE_QUERY_TIMEOUT_SECONDS", "2.0")), + "LIVE_STATE_STALE_SECONDS": int(os.getenv("LIVE_STATE_STALE_SECONDS", "30")), + "LIVE_STATE_HISTORY_DAYS": int(os.getenv("LIVE_STATE_HISTORY_DAYS", "30")), + "LIVE_STATE_RETENTION_EVERY_TICKS": int(os.getenv("LIVE_STATE_RETENTION_EVERY_TICKS", "60")), + "STUCK_SESSION_SECONDS": int(os.getenv("STUCK_SESSION_SECONDS", "60")), + "STEAM_PROFILE_TTL_SECONDS": int(os.getenv("STEAM_PROFILE_TTL_SECONDS", "86400")), + "STEAM_WEB_API_KEY": os.getenv("STEAM_WEB_API_KEY", ""), } diff --git a/l4d2web/services/live_state_poller.py b/l4d2web/services/live_state_poller.py index ece58d9..87fda4b 100644 --- a/l4d2web/services/live_state_poller.py +++ b/l4d2web/services/live_state_poller.py @@ -12,6 +12,8 @@ This file is built up across Tasks 6-10. from __future__ import annotations import logging +import threading +import time from datetime import datetime, timedelta, UTC from flask import current_app @@ -47,12 +49,14 @@ def poll_once() -> None: api_key = current_app.config.get("STEAM_WEB_API_KEY", "") or "" ttl_seconds = int(current_app.config.get("STEAM_PROFILE_TTL_SECONDS", 86400)) + timeout = float(current_app.config.get("LIVE_STATE_QUERY_TIMEOUT_SECONDS", 2.0)) for server_id, port, password in targets: try: - status = query_status("127.0.0.1", port, password, timeout=2.0) + status = query_status("127.0.0.1", port, password, timeout=timeout) except RconError: logger.warning("rcon query failed for server %d", server_id, exc_info=True) + _close_stuck_sessions(server_id) continue _record_snapshot(server_id, status) @@ -100,6 +104,37 @@ def _enrich_profiles(status: StatusResponse, *, api_key: str, ttl_seconds: int) row.fetched_at = now +def prune_history() -> None: + """Delete snapshots and closed sessions older than retention.""" + days = int(current_app.config.get("LIVE_STATE_HISTORY_DAYS", 30)) + cutoff = _now() - timedelta(days=days) + with session_scope() as db: + db.query(ServerLiveState).filter( + ServerLiveState.last_seen_at < cutoff + ).delete(synchronize_session=False) + db.query(ServerPlayerSession).filter( + ServerPlayerSession.left_at.is_not(None), + ServerPlayerSession.left_at < cutoff, + ).delete(synchronize_session=False) + + +def _close_stuck_sessions(server_id: int) -> None: + """If a server has open sessions whose joined_at is older than threshold + AND we've failed to observe them, close them as stuck.""" + seconds = int(current_app.config.get("STUCK_SESSION_SECONDS", 60)) + cutoff = _now() - timedelta(seconds=seconds) + with session_scope() as db: + rows = db.scalars( + select(ServerPlayerSession).where( + ServerPlayerSession.server_id == server_id, + ServerPlayerSession.left_at.is_(None), + ServerPlayerSession.joined_at < cutoff, + ) + ).all() + for row in rows: + row.left_at = _now() + + def _record_snapshot(server_id: int, status: StatusResponse) -> None: """RLE write: bump last_seen_at if state matches, else insert a new row.""" now = _now() @@ -186,3 +221,39 @@ def _reconcile_sessions(server_id: int, status: StatusResponse) -> None: existing.min_ping = p.ping if p.ping > existing.max_ping: existing.max_ping = p.ping + + +_poller_started_lock = threading.Lock() +_poller_started = False + + +def start_live_state_poller(app) -> None: + """Spawn the daemon poller thread once per process.""" + global _poller_started + with _poller_started_lock: + if _poller_started: + return + _poller_started = True + interval = float(app.config.get("LIVE_STATE_POLL_SECONDS", 5)) + retention_every = max(1, int(app.config.get("LIVE_STATE_RETENTION_EVERY_TICKS", 60))) + thread = threading.Thread( + target=_poller_loop, + args=(app, interval, retention_every), + name="left4me-live-state-poller", + daemon=True, + ) + thread.start() + + +def _poller_loop(app, interval: float, retention_every: int) -> None: + tick = 0 + while True: + try: + with app.app_context(): + poll_once() + if tick % retention_every == 0: + prune_history() + except Exception: + logger.exception("live-state poller loop exception") + tick += 1 + time.sleep(interval) diff --git a/l4d2web/tests/test_live_state_poller.py b/l4d2web/tests/test_live_state_poller.py index 45026ce..efc2955 100644 --- a/l4d2web/tests/test_live_state_poller.py +++ b/l4d2web/tests/test_live_state_poller.py @@ -272,6 +272,79 @@ def test_skips_enrichment_when_api_key_unset(tmp_path, monkeypatch) -> None: live_state_poller.poll_once() +def test_retention_trims_old_rows(tmp_path, monkeypatch) -> None: + app, sid = _seed(tmp_path) + app.config["LIVE_STATE_HISTORY_DAYS"] = 30 + long_ago = datetime.now(UTC).replace(tzinfo=None) - timedelta(days=45) + with session_scope() as db: + db.add(ServerLiveState( + server_id=sid, started_at=long_ago, last_seen_at=long_ago, + players=0, max_players=4, bots=0, map="old", hibernating=True, + )) + db.add(ServerPlayerSession( + server_id=sid, steam_id_64="76561197960828710", + joined_at=long_ago, left_at=long_ago, + name_at_join="OldPlayer", min_ping=10, max_ping=10, + )) + + with app.app_context(): + live_state_poller.prune_history() + + with session_scope() as db: + snaps = db.scalars(select(ServerLiveState)).all() + sess = db.scalars(select(ServerPlayerSession)).all() + assert snaps == [] + assert sess == [] + + +def test_close_stuck_sessions_after_threshold(tmp_path, monkeypatch) -> None: + app, sid = _seed(tmp_path) + app.config["STUCK_SESSION_SECONDS"] = 60 + way_back = datetime.now(UTC).replace(tzinfo=None) - timedelta(hours=2) + with session_scope() as db: + db.add(ServerPlayerSession( + server_id=sid, steam_id_64="76561197960828710", + joined_at=way_back, left_at=None, + name_at_join="GhostPlayer", min_ping=10, max_ping=10, + )) + + # Server is in `targets` but the RCON call fails — i.e., we haven't seen + # this server respond in a long time. Poller must close stuck sessions. + def boom(*a, **kw): + from l4d2web.services.rcon import RconError + raise RconError("simulated outage") + monkeypatch.setattr(live_state_poller, "query_status", boom) + + with app.app_context(): + live_state_poller.poll_once() + + with session_scope() as db: + row = db.scalar(select(ServerPlayerSession)) + assert row.left_at is not None + + +def test_start_live_state_poller_skipped_during_testing(monkeypatch, tmp_path) -> None: + from l4d2web import app as app_module + called: list = [] + monkeypatch.setattr( + app_module, "start_live_state_poller", lambda app: called.append(app) + ) + app_module.create_app({"TESTING": True, "DATABASE_URL": f"sqlite:///{tmp_path/'x.db'}", "SECRET_KEY": "k"}) + assert called == [] + + +def test_start_live_state_poller_started_outside_testing(monkeypatch, tmp_path) -> None: + from l4d2web import app as app_module + called: list = [] + monkeypatch.setattr( + app_module, "start_live_state_poller", lambda app: called.append(app) + ) + app = app_module.create_app( + {"TESTING": False, "DATABASE_URL": f"sqlite:///{tmp_path/'x.db'}", "SECRET_KEY": "k"} + ) + assert called == [app] + + def test_skips_enrichment_when_cache_is_fresh(tmp_path, monkeypatch) -> None: app, sid = _seed(tmp_path) app.config["STEAM_WEB_API_KEY"] = "KEY"