feat(live-state): poller writes RLE snapshots to server_live_state
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
f88d07a473
commit
f48d624dcc
2 changed files with 217 additions and 0 deletions
93
l4d2web/services/live_state_poller.py
Normal file
93
l4d2web/services/live_state_poller.py
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
"""Background poller that maintains live game-server state in the DB.
|
||||
|
||||
Modeled on l4d2web/services/job_worker.py:617-647. This module owns:
|
||||
- per-server snapshot writes with run-length encoding into
|
||||
`server_live_state`
|
||||
- player-session lifecycle in `server_player_session` (Task 7)
|
||||
- Steam profile enrichment into `steam_user_profile` (Task 8)
|
||||
- retention pruning and stuck-session closure (Task 10)
|
||||
|
||||
This file is built up across Tasks 6-10.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime, UTC
|
||||
from typing import Callable
|
||||
|
||||
from sqlalchemy import select
|
||||
|
||||
from l4d2web.db import session_scope
|
||||
from l4d2web.models import (
|
||||
Server,
|
||||
ServerLiveState,
|
||||
)
|
||||
from l4d2web.services.rcon import RconError, StatusResponse, query_status
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _now() -> datetime:
|
||||
return datetime.now(UTC).replace(tzinfo=None)
|
||||
|
||||
|
||||
def poll_once() -> None:
|
||||
"""One pass over all running servers with a configured rcon_password."""
|
||||
with session_scope() as db:
|
||||
servers = db.scalars(
|
||||
select(Server)
|
||||
.where(Server.actual_state == "running")
|
||||
.where(Server.rcon_password != "")
|
||||
).all()
|
||||
targets = [(s.id, s.port, s.rcon_password) for s in servers]
|
||||
|
||||
for server_id, port, password in targets:
|
||||
try:
|
||||
status = query_status("127.0.0.1", port, password, timeout=2.0)
|
||||
except RconError:
|
||||
logger.warning("rcon query failed for server %d", server_id, exc_info=True)
|
||||
continue
|
||||
|
||||
_record_snapshot(server_id, status)
|
||||
|
||||
|
||||
def _record_snapshot(server_id: int, status: StatusResponse) -> None:
|
||||
"""RLE write: bump last_seen_at if state matches, else insert a new row."""
|
||||
now = _now()
|
||||
with session_scope() as db:
|
||||
latest = db.scalars(
|
||||
select(ServerLiveState)
|
||||
.where(ServerLiveState.server_id == server_id)
|
||||
.order_by(ServerLiveState.started_at.desc())
|
||||
.limit(1)
|
||||
).first()
|
||||
|
||||
if latest is not None and _matches(latest, status):
|
||||
latest.last_seen_at = now
|
||||
return
|
||||
|
||||
db.add(
|
||||
ServerLiveState(
|
||||
server_id=server_id,
|
||||
started_at=now,
|
||||
last_seen_at=now,
|
||||
players=status.players,
|
||||
max_players=status.max_players,
|
||||
bots=status.bots,
|
||||
map=status.map,
|
||||
hibernating=status.hibernating,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def _matches(row: ServerLiveState, status: StatusResponse) -> bool:
|
||||
return (
|
||||
row.players == status.players
|
||||
and row.max_players == status.max_players
|
||||
and row.bots == status.bots
|
||||
and row.map == status.map
|
||||
and row.hibernating == status.hibernating
|
||||
)
|
||||
124
l4d2web/tests/test_live_state_poller.py
Normal file
124
l4d2web/tests/test_live_state_poller.py
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
"""Live-state poller tests.
|
||||
|
||||
Each test seeds an app + DB, monkeypatches the RCON client to return a
|
||||
canned StatusResponse, and asserts on what the poller writes.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta, UTC
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
|
||||
from l4d2web.app import create_app
|
||||
from l4d2web.db import init_db, session_scope
|
||||
from l4d2web.models import (
|
||||
Blueprint,
|
||||
Server,
|
||||
ServerLiveState,
|
||||
User,
|
||||
)
|
||||
from l4d2web.services import live_state_poller
|
||||
from l4d2web.services.rcon import PlayerRow, StatusResponse
|
||||
|
||||
|
||||
def _seed(tmp_path):
|
||||
db_url = f"sqlite:///{tmp_path/'p.db'}"
|
||||
import os
|
||||
os.environ["DATABASE_URL"] = db_url
|
||||
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "x"})
|
||||
init_db()
|
||||
with session_scope() as db:
|
||||
u = User(username="u", password_digest="x"); db.add(u); db.flush()
|
||||
bp = Blueprint(user_id=u.id, name="bp", arguments="[]", config="[]"); db.add(bp); db.flush()
|
||||
s = Server(
|
||||
user_id=u.id, blueprint_id=bp.id, name="s", port=27500,
|
||||
rcon_password="pw", actual_state="running",
|
||||
)
|
||||
db.add(s); db.flush()
|
||||
return app, s.id
|
||||
|
||||
|
||||
def _status(players: int, map_: str = "c1m1_hotel", hibernating: bool = False,
|
||||
roster: list[PlayerRow] | None = None) -> StatusResponse:
|
||||
return StatusResponse(
|
||||
map=map_, players=players, max_players=4, bots=0,
|
||||
hibernating=hibernating, roster=roster or [],
|
||||
)
|
||||
|
||||
|
||||
def test_rle_bumps_last_seen_when_state_unchanged(tmp_path, monkeypatch) -> None:
|
||||
app, sid = _seed(tmp_path)
|
||||
monkeypatch.setattr(
|
||||
live_state_poller, "query_status",
|
||||
lambda host, port, password, timeout: _status(players=0),
|
||||
)
|
||||
|
||||
with app.app_context():
|
||||
live_state_poller.poll_once()
|
||||
live_state_poller.poll_once()
|
||||
|
||||
with session_scope() as db:
|
||||
rows = db.scalars(
|
||||
select(ServerLiveState).where(ServerLiveState.server_id == sid)
|
||||
.order_by(ServerLiveState.started_at)
|
||||
).all()
|
||||
assert len(rows) == 1
|
||||
assert rows[0].players == 0
|
||||
assert rows[0].last_seen_at >= rows[0].started_at
|
||||
|
||||
|
||||
def test_rle_inserts_new_row_on_state_change(tmp_path, monkeypatch) -> None:
|
||||
app, sid = _seed(tmp_path)
|
||||
snapshots = iter([_status(players=0), _status(players=1)])
|
||||
monkeypatch.setattr(
|
||||
live_state_poller, "query_status",
|
||||
lambda *a, **kw: next(snapshots),
|
||||
)
|
||||
|
||||
with app.app_context():
|
||||
live_state_poller.poll_once()
|
||||
live_state_poller.poll_once()
|
||||
|
||||
with session_scope() as db:
|
||||
rows = db.scalars(
|
||||
select(ServerLiveState).where(ServerLiveState.server_id == sid)
|
||||
.order_by(ServerLiveState.started_at)
|
||||
).all()
|
||||
assert [r.players for r in rows] == [0, 1]
|
||||
|
||||
|
||||
def test_skips_servers_without_rcon_password(tmp_path, monkeypatch) -> None:
|
||||
app, sid = _seed(tmp_path)
|
||||
with session_scope() as db:
|
||||
s = db.scalar(select(Server).where(Server.id == sid))
|
||||
s.rcon_password = ""
|
||||
|
||||
called: list = []
|
||||
monkeypatch.setattr(
|
||||
live_state_poller, "query_status",
|
||||
lambda *a, **kw: called.append(1) or _status(0),
|
||||
)
|
||||
|
||||
with app.app_context():
|
||||
live_state_poller.poll_once()
|
||||
|
||||
assert called == []
|
||||
|
||||
|
||||
def test_skips_non_running_servers(tmp_path, monkeypatch) -> None:
|
||||
app, sid = _seed(tmp_path)
|
||||
with session_scope() as db:
|
||||
s = db.scalar(select(Server).where(Server.id == sid))
|
||||
s.actual_state = "stopped"
|
||||
|
||||
called: list = []
|
||||
monkeypatch.setattr(
|
||||
live_state_poller, "query_status",
|
||||
lambda *a, **kw: called.append(1) or _status(0),
|
||||
)
|
||||
|
||||
with app.app_context():
|
||||
live_state_poller.poll_once()
|
||||
|
||||
assert called == []
|
||||
Loading…
Reference in a new issue