feat(live-state): start daemon poller, prune history, close stuck sessions
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
be476112ee
commit
0dc61d5de4
4 changed files with 164 additions and 1 deletions
|
|
@ -25,6 +25,7 @@ from l4d2web.services.job_worker import (
|
||||||
start_job_workers,
|
start_job_workers,
|
||||||
start_state_poller,
|
start_state_poller,
|
||||||
)
|
)
|
||||||
|
from l4d2web.services.live_state_poller import start_live_state_poller
|
||||||
|
|
||||||
|
|
||||||
def _in_flask_cli_context() -> bool:
|
def _in_flask_cli_context() -> bool:
|
||||||
|
|
@ -98,6 +99,8 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
|
||||||
recover_stale_jobs()
|
recover_stale_jobs()
|
||||||
start_job_workers(app)
|
start_job_workers(app)
|
||||||
start_state_poller(app)
|
start_state_poller(app)
|
||||||
|
if not app.config.get("TESTING"):
|
||||||
|
start_live_state_poller(app)
|
||||||
|
|
||||||
@app.get("/health")
|
@app.get("/health")
|
||||||
def health():
|
def health():
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,14 @@ DEFAULT_CONFIG: dict[str, object] = {
|
||||||
"JOB_LOG_LINE_MAX_CHARS": 4096,
|
"JOB_LOG_LINE_MAX_CHARS": 4096,
|
||||||
"PORT_RANGE_START": 27015,
|
"PORT_RANGE_START": 27015,
|
||||||
"PORT_RANGE_END": 27115,
|
"PORT_RANGE_END": 27115,
|
||||||
|
"LIVE_STATE_POLL_SECONDS": 5,
|
||||||
|
"LIVE_STATE_QUERY_TIMEOUT_SECONDS": 2.0,
|
||||||
|
"LIVE_STATE_STALE_SECONDS": 30,
|
||||||
|
"LIVE_STATE_HISTORY_DAYS": 30,
|
||||||
|
"LIVE_STATE_RETENTION_EVERY_TICKS": 60,
|
||||||
|
"STUCK_SESSION_SECONDS": 60,
|
||||||
|
"STEAM_PROFILE_TTL_SECONDS": 86400,
|
||||||
|
"STEAM_WEB_API_KEY": "",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -33,4 +41,12 @@ def load_config() -> dict[str, object]:
|
||||||
"JOB_LOG_LINE_MAX_CHARS": int(os.getenv("JOB_LOG_LINE_MAX_CHARS", "4096")),
|
"JOB_LOG_LINE_MAX_CHARS": int(os.getenv("JOB_LOG_LINE_MAX_CHARS", "4096")),
|
||||||
"PORT_RANGE_START": int(os.getenv("LEFT4ME_PORT_RANGE_START", "27015")),
|
"PORT_RANGE_START": int(os.getenv("LEFT4ME_PORT_RANGE_START", "27015")),
|
||||||
"PORT_RANGE_END": int(os.getenv("LEFT4ME_PORT_RANGE_END", "27115")),
|
"PORT_RANGE_END": int(os.getenv("LEFT4ME_PORT_RANGE_END", "27115")),
|
||||||
|
"LIVE_STATE_POLL_SECONDS": float(os.getenv("LIVE_STATE_POLL_SECONDS", "5")),
|
||||||
|
"LIVE_STATE_QUERY_TIMEOUT_SECONDS": float(os.getenv("LIVE_STATE_QUERY_TIMEOUT_SECONDS", "2.0")),
|
||||||
|
"LIVE_STATE_STALE_SECONDS": int(os.getenv("LIVE_STATE_STALE_SECONDS", "30")),
|
||||||
|
"LIVE_STATE_HISTORY_DAYS": int(os.getenv("LIVE_STATE_HISTORY_DAYS", "30")),
|
||||||
|
"LIVE_STATE_RETENTION_EVERY_TICKS": int(os.getenv("LIVE_STATE_RETENTION_EVERY_TICKS", "60")),
|
||||||
|
"STUCK_SESSION_SECONDS": int(os.getenv("STUCK_SESSION_SECONDS", "60")),
|
||||||
|
"STEAM_PROFILE_TTL_SECONDS": int(os.getenv("STEAM_PROFILE_TTL_SECONDS", "86400")),
|
||||||
|
"STEAM_WEB_API_KEY": os.getenv("STEAM_WEB_API_KEY", ""),
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,8 @@ This file is built up across Tasks 6-10.
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
from datetime import datetime, timedelta, UTC
|
from datetime import datetime, timedelta, UTC
|
||||||
|
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
|
|
@ -47,12 +49,14 @@ def poll_once() -> None:
|
||||||
|
|
||||||
api_key = current_app.config.get("STEAM_WEB_API_KEY", "") or ""
|
api_key = current_app.config.get("STEAM_WEB_API_KEY", "") or ""
|
||||||
ttl_seconds = int(current_app.config.get("STEAM_PROFILE_TTL_SECONDS", 86400))
|
ttl_seconds = int(current_app.config.get("STEAM_PROFILE_TTL_SECONDS", 86400))
|
||||||
|
timeout = float(current_app.config.get("LIVE_STATE_QUERY_TIMEOUT_SECONDS", 2.0))
|
||||||
|
|
||||||
for server_id, port, password in targets:
|
for server_id, port, password in targets:
|
||||||
try:
|
try:
|
||||||
status = query_status("127.0.0.1", port, password, timeout=2.0)
|
status = query_status("127.0.0.1", port, password, timeout=timeout)
|
||||||
except RconError:
|
except RconError:
|
||||||
logger.warning("rcon query failed for server %d", server_id, exc_info=True)
|
logger.warning("rcon query failed for server %d", server_id, exc_info=True)
|
||||||
|
_close_stuck_sessions(server_id)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
_record_snapshot(server_id, status)
|
_record_snapshot(server_id, status)
|
||||||
|
|
@ -100,6 +104,37 @@ def _enrich_profiles(status: StatusResponse, *, api_key: str, ttl_seconds: int)
|
||||||
row.fetched_at = now
|
row.fetched_at = now
|
||||||
|
|
||||||
|
|
||||||
|
def prune_history() -> None:
|
||||||
|
"""Delete snapshots and closed sessions older than retention."""
|
||||||
|
days = int(current_app.config.get("LIVE_STATE_HISTORY_DAYS", 30))
|
||||||
|
cutoff = _now() - timedelta(days=days)
|
||||||
|
with session_scope() as db:
|
||||||
|
db.query(ServerLiveState).filter(
|
||||||
|
ServerLiveState.last_seen_at < cutoff
|
||||||
|
).delete(synchronize_session=False)
|
||||||
|
db.query(ServerPlayerSession).filter(
|
||||||
|
ServerPlayerSession.left_at.is_not(None),
|
||||||
|
ServerPlayerSession.left_at < cutoff,
|
||||||
|
).delete(synchronize_session=False)
|
||||||
|
|
||||||
|
|
||||||
|
def _close_stuck_sessions(server_id: int) -> None:
|
||||||
|
"""If a server has open sessions whose joined_at is older than threshold
|
||||||
|
AND we've failed to observe them, close them as stuck."""
|
||||||
|
seconds = int(current_app.config.get("STUCK_SESSION_SECONDS", 60))
|
||||||
|
cutoff = _now() - timedelta(seconds=seconds)
|
||||||
|
with session_scope() as db:
|
||||||
|
rows = db.scalars(
|
||||||
|
select(ServerPlayerSession).where(
|
||||||
|
ServerPlayerSession.server_id == server_id,
|
||||||
|
ServerPlayerSession.left_at.is_(None),
|
||||||
|
ServerPlayerSession.joined_at < cutoff,
|
||||||
|
)
|
||||||
|
).all()
|
||||||
|
for row in rows:
|
||||||
|
row.left_at = _now()
|
||||||
|
|
||||||
|
|
||||||
def _record_snapshot(server_id: int, status: StatusResponse) -> None:
|
def _record_snapshot(server_id: int, status: StatusResponse) -> None:
|
||||||
"""RLE write: bump last_seen_at if state matches, else insert a new row."""
|
"""RLE write: bump last_seen_at if state matches, else insert a new row."""
|
||||||
now = _now()
|
now = _now()
|
||||||
|
|
@ -186,3 +221,39 @@ def _reconcile_sessions(server_id: int, status: StatusResponse) -> None:
|
||||||
existing.min_ping = p.ping
|
existing.min_ping = p.ping
|
||||||
if p.ping > existing.max_ping:
|
if p.ping > existing.max_ping:
|
||||||
existing.max_ping = p.ping
|
existing.max_ping = p.ping
|
||||||
|
|
||||||
|
|
||||||
|
_poller_started_lock = threading.Lock()
|
||||||
|
_poller_started = False
|
||||||
|
|
||||||
|
|
||||||
|
def start_live_state_poller(app) -> None:
|
||||||
|
"""Spawn the daemon poller thread once per process."""
|
||||||
|
global _poller_started
|
||||||
|
with _poller_started_lock:
|
||||||
|
if _poller_started:
|
||||||
|
return
|
||||||
|
_poller_started = True
|
||||||
|
interval = float(app.config.get("LIVE_STATE_POLL_SECONDS", 5))
|
||||||
|
retention_every = max(1, int(app.config.get("LIVE_STATE_RETENTION_EVERY_TICKS", 60)))
|
||||||
|
thread = threading.Thread(
|
||||||
|
target=_poller_loop,
|
||||||
|
args=(app, interval, retention_every),
|
||||||
|
name="left4me-live-state-poller",
|
||||||
|
daemon=True,
|
||||||
|
)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
|
||||||
|
def _poller_loop(app, interval: float, retention_every: int) -> None:
|
||||||
|
tick = 0
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
with app.app_context():
|
||||||
|
poll_once()
|
||||||
|
if tick % retention_every == 0:
|
||||||
|
prune_history()
|
||||||
|
except Exception:
|
||||||
|
logger.exception("live-state poller loop exception")
|
||||||
|
tick += 1
|
||||||
|
time.sleep(interval)
|
||||||
|
|
|
||||||
|
|
@ -272,6 +272,79 @@ def test_skips_enrichment_when_api_key_unset(tmp_path, monkeypatch) -> None:
|
||||||
live_state_poller.poll_once()
|
live_state_poller.poll_once()
|
||||||
|
|
||||||
|
|
||||||
|
def test_retention_trims_old_rows(tmp_path, monkeypatch) -> None:
|
||||||
|
app, sid = _seed(tmp_path)
|
||||||
|
app.config["LIVE_STATE_HISTORY_DAYS"] = 30
|
||||||
|
long_ago = datetime.now(UTC).replace(tzinfo=None) - timedelta(days=45)
|
||||||
|
with session_scope() as db:
|
||||||
|
db.add(ServerLiveState(
|
||||||
|
server_id=sid, started_at=long_ago, last_seen_at=long_ago,
|
||||||
|
players=0, max_players=4, bots=0, map="old", hibernating=True,
|
||||||
|
))
|
||||||
|
db.add(ServerPlayerSession(
|
||||||
|
server_id=sid, steam_id_64="76561197960828710",
|
||||||
|
joined_at=long_ago, left_at=long_ago,
|
||||||
|
name_at_join="OldPlayer", min_ping=10, max_ping=10,
|
||||||
|
))
|
||||||
|
|
||||||
|
with app.app_context():
|
||||||
|
live_state_poller.prune_history()
|
||||||
|
|
||||||
|
with session_scope() as db:
|
||||||
|
snaps = db.scalars(select(ServerLiveState)).all()
|
||||||
|
sess = db.scalars(select(ServerPlayerSession)).all()
|
||||||
|
assert snaps == []
|
||||||
|
assert sess == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_close_stuck_sessions_after_threshold(tmp_path, monkeypatch) -> None:
|
||||||
|
app, sid = _seed(tmp_path)
|
||||||
|
app.config["STUCK_SESSION_SECONDS"] = 60
|
||||||
|
way_back = datetime.now(UTC).replace(tzinfo=None) - timedelta(hours=2)
|
||||||
|
with session_scope() as db:
|
||||||
|
db.add(ServerPlayerSession(
|
||||||
|
server_id=sid, steam_id_64="76561197960828710",
|
||||||
|
joined_at=way_back, left_at=None,
|
||||||
|
name_at_join="GhostPlayer", min_ping=10, max_ping=10,
|
||||||
|
))
|
||||||
|
|
||||||
|
# Server is in `targets` but the RCON call fails — i.e., we haven't seen
|
||||||
|
# this server respond in a long time. Poller must close stuck sessions.
|
||||||
|
def boom(*a, **kw):
|
||||||
|
from l4d2web.services.rcon import RconError
|
||||||
|
raise RconError("simulated outage")
|
||||||
|
monkeypatch.setattr(live_state_poller, "query_status", boom)
|
||||||
|
|
||||||
|
with app.app_context():
|
||||||
|
live_state_poller.poll_once()
|
||||||
|
|
||||||
|
with session_scope() as db:
|
||||||
|
row = db.scalar(select(ServerPlayerSession))
|
||||||
|
assert row.left_at is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_start_live_state_poller_skipped_during_testing(monkeypatch, tmp_path) -> None:
|
||||||
|
from l4d2web import app as app_module
|
||||||
|
called: list = []
|
||||||
|
monkeypatch.setattr(
|
||||||
|
app_module, "start_live_state_poller", lambda app: called.append(app)
|
||||||
|
)
|
||||||
|
app_module.create_app({"TESTING": True, "DATABASE_URL": f"sqlite:///{tmp_path/'x.db'}", "SECRET_KEY": "k"})
|
||||||
|
assert called == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_start_live_state_poller_started_outside_testing(monkeypatch, tmp_path) -> None:
|
||||||
|
from l4d2web import app as app_module
|
||||||
|
called: list = []
|
||||||
|
monkeypatch.setattr(
|
||||||
|
app_module, "start_live_state_poller", lambda app: called.append(app)
|
||||||
|
)
|
||||||
|
app = app_module.create_app(
|
||||||
|
{"TESTING": False, "DATABASE_URL": f"sqlite:///{tmp_path/'x.db'}", "SECRET_KEY": "k"}
|
||||||
|
)
|
||||||
|
assert called == [app]
|
||||||
|
|
||||||
|
|
||||||
def test_skips_enrichment_when_cache_is_fresh(tmp_path, monkeypatch) -> None:
|
def test_skips_enrichment_when_cache_is_fresh(tmp_path, monkeypatch) -> None:
|
||||||
app, sid = _seed(tmp_path)
|
app, sid = _seed(tmp_path)
|
||||||
app.config["STEAM_WEB_API_KEY"] = "KEY"
|
app.config["STEAM_WEB_API_KEY"] = "KEY"
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue