From f48d624dcc033756aff1e30487a313f7b87e1bb0 Mon Sep 17 00:00:00 2001 From: mwiegand Date: Tue, 12 May 2026 21:53:58 +0200 Subject: [PATCH] feat(live-state): poller writes RLE snapshots to server_live_state Co-Authored-By: Claude Sonnet 4.6 --- l4d2web/services/live_state_poller.py | 93 ++++++++++++++++++ l4d2web/tests/test_live_state_poller.py | 124 ++++++++++++++++++++++++ 2 files changed, 217 insertions(+) create mode 100644 l4d2web/services/live_state_poller.py create mode 100644 l4d2web/tests/test_live_state_poller.py diff --git a/l4d2web/services/live_state_poller.py b/l4d2web/services/live_state_poller.py new file mode 100644 index 0000000..cc78c88 --- /dev/null +++ b/l4d2web/services/live_state_poller.py @@ -0,0 +1,93 @@ +"""Background poller that maintains live game-server state in the DB. + +Modeled on l4d2web/services/job_worker.py:617-647. This module owns: + - per-server snapshot writes with run-length encoding into + `server_live_state` + - player-session lifecycle in `server_player_session` (Task 7) + - Steam profile enrichment into `steam_user_profile` (Task 8) + - retention pruning and stuck-session closure (Task 10) + +This file is built up across Tasks 6-10. +""" +from __future__ import annotations + +import logging +import threading +import time +from datetime import datetime, UTC +from typing import Callable + +from sqlalchemy import select + +from l4d2web.db import session_scope +from l4d2web.models import ( + Server, + ServerLiveState, +) +from l4d2web.services.rcon import RconError, StatusResponse, query_status + + +logger = logging.getLogger(__name__) + + +def _now() -> datetime: + return datetime.now(UTC).replace(tzinfo=None) + + +def poll_once() -> None: + """One pass over all running servers with a configured rcon_password.""" + with session_scope() as db: + servers = db.scalars( + select(Server) + .where(Server.actual_state == "running") + .where(Server.rcon_password != "") + ).all() + targets = [(s.id, s.port, s.rcon_password) for s in servers] + + for server_id, port, password in targets: + try: + status = query_status("127.0.0.1", port, password, timeout=2.0) + except RconError: + logger.warning("rcon query failed for server %d", server_id, exc_info=True) + continue + + _record_snapshot(server_id, status) + + +def _record_snapshot(server_id: int, status: StatusResponse) -> None: + """RLE write: bump last_seen_at if state matches, else insert a new row.""" + now = _now() + with session_scope() as db: + latest = db.scalars( + select(ServerLiveState) + .where(ServerLiveState.server_id == server_id) + .order_by(ServerLiveState.started_at.desc()) + .limit(1) + ).first() + + if latest is not None and _matches(latest, status): + latest.last_seen_at = now + return + + db.add( + ServerLiveState( + server_id=server_id, + started_at=now, + last_seen_at=now, + players=status.players, + max_players=status.max_players, + bots=status.bots, + map=status.map, + hibernating=status.hibernating, + ) + ) + + +def _matches(row: ServerLiveState, status: StatusResponse) -> bool: + return ( + row.players == status.players + and row.max_players == status.max_players + and row.bots == status.bots + and row.map == status.map + and row.hibernating == status.hibernating + ) diff --git a/l4d2web/tests/test_live_state_poller.py b/l4d2web/tests/test_live_state_poller.py new file mode 100644 index 0000000..865672f --- /dev/null +++ b/l4d2web/tests/test_live_state_poller.py @@ -0,0 +1,124 @@ +"""Live-state poller tests. + +Each test seeds an app + DB, monkeypatches the RCON client to return a +canned StatusResponse, and asserts on what the poller writes. +""" +from __future__ import annotations + +from datetime import datetime, timedelta, UTC + +import pytest +from sqlalchemy import select + +from l4d2web.app import create_app +from l4d2web.db import init_db, session_scope +from l4d2web.models import ( + Blueprint, + Server, + ServerLiveState, + User, +) +from l4d2web.services import live_state_poller +from l4d2web.services.rcon import PlayerRow, StatusResponse + + +def _seed(tmp_path): + db_url = f"sqlite:///{tmp_path/'p.db'}" + import os + os.environ["DATABASE_URL"] = db_url + app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "x"}) + init_db() + with session_scope() as db: + u = User(username="u", password_digest="x"); db.add(u); db.flush() + bp = Blueprint(user_id=u.id, name="bp", arguments="[]", config="[]"); db.add(bp); db.flush() + s = Server( + user_id=u.id, blueprint_id=bp.id, name="s", port=27500, + rcon_password="pw", actual_state="running", + ) + db.add(s); db.flush() + return app, s.id + + +def _status(players: int, map_: str = "c1m1_hotel", hibernating: bool = False, + roster: list[PlayerRow] | None = None) -> StatusResponse: + return StatusResponse( + map=map_, players=players, max_players=4, bots=0, + hibernating=hibernating, roster=roster or [], + ) + + +def test_rle_bumps_last_seen_when_state_unchanged(tmp_path, monkeypatch) -> None: + app, sid = _seed(tmp_path) + monkeypatch.setattr( + live_state_poller, "query_status", + lambda host, port, password, timeout: _status(players=0), + ) + + with app.app_context(): + live_state_poller.poll_once() + live_state_poller.poll_once() + + with session_scope() as db: + rows = db.scalars( + select(ServerLiveState).where(ServerLiveState.server_id == sid) + .order_by(ServerLiveState.started_at) + ).all() + assert len(rows) == 1 + assert rows[0].players == 0 + assert rows[0].last_seen_at >= rows[0].started_at + + +def test_rle_inserts_new_row_on_state_change(tmp_path, monkeypatch) -> None: + app, sid = _seed(tmp_path) + snapshots = iter([_status(players=0), _status(players=1)]) + monkeypatch.setattr( + live_state_poller, "query_status", + lambda *a, **kw: next(snapshots), + ) + + with app.app_context(): + live_state_poller.poll_once() + live_state_poller.poll_once() + + with session_scope() as db: + rows = db.scalars( + select(ServerLiveState).where(ServerLiveState.server_id == sid) + .order_by(ServerLiveState.started_at) + ).all() + assert [r.players for r in rows] == [0, 1] + + +def test_skips_servers_without_rcon_password(tmp_path, monkeypatch) -> None: + app, sid = _seed(tmp_path) + with session_scope() as db: + s = db.scalar(select(Server).where(Server.id == sid)) + s.rcon_password = "" + + called: list = [] + monkeypatch.setattr( + live_state_poller, "query_status", + lambda *a, **kw: called.append(1) or _status(0), + ) + + with app.app_context(): + live_state_poller.poll_once() + + assert called == [] + + +def test_skips_non_running_servers(tmp_path, monkeypatch) -> None: + app, sid = _seed(tmp_path) + with session_scope() as db: + s = db.scalar(select(Server).where(Server.id == sid)) + s.actual_state = "stopped" + + called: list = [] + monkeypatch.setattr( + live_state_poller, "query_status", + lambda *a, **kw: called.append(1) or _status(0), + ) + + with app.app_context(): + live_state_poller.poll_once() + + assert called == []