left4me/l4d2web/tests/test_live_state_poller.py
mwiegand 0dc61d5de4
feat(live-state): start daemon poller, prune history, close stuck sessions
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 22:10:55 +02:00

371 lines
12 KiB
Python

"""Live-state poller tests.
Each test seeds an app + DB, monkeypatches the RCON client to return a
canned StatusResponse, and asserts on what the poller writes.
"""
from __future__ import annotations
from datetime import datetime, timedelta, UTC
import pytest
from sqlalchemy import select
from l4d2web.app import create_app
from l4d2web.db import init_db, session_scope
from l4d2web.models import (
Blueprint,
Server,
ServerLiveState,
ServerPlayerSession,
SteamUserProfile,
User,
)
from l4d2web.services import live_state_poller
from l4d2web.services import steam_users
from l4d2web.services.rcon import PlayerRow, StatusResponse
def _seed(tmp_path):
db_url = f"sqlite:///{tmp_path/'p.db'}"
import os
os.environ["DATABASE_URL"] = db_url
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "x"})
init_db()
with session_scope() as db:
u = User(username="u", password_digest="x"); db.add(u); db.flush()
bp = Blueprint(user_id=u.id, name="bp", arguments="[]", config="[]"); db.add(bp); db.flush()
s = Server(
user_id=u.id, blueprint_id=bp.id, name="s", port=27500,
rcon_password="pw", actual_state="running",
)
db.add(s); db.flush()
return app, s.id
def _status(players: int, map_: str = "c1m1_hotel", hibernating: bool = False,
roster: list[PlayerRow] | None = None) -> StatusResponse:
return StatusResponse(
map=map_, players=players, max_players=4, bots=0,
hibernating=hibernating, roster=roster or [],
)
def test_rle_bumps_last_seen_when_state_unchanged(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
monkeypatch.setattr(
live_state_poller, "query_status",
lambda host, port, password, timeout: _status(players=0),
)
with app.app_context():
live_state_poller.poll_once()
live_state_poller.poll_once()
with session_scope() as db:
rows = db.scalars(
select(ServerLiveState).where(ServerLiveState.server_id == sid)
.order_by(ServerLiveState.started_at)
).all()
assert len(rows) == 1
assert rows[0].players == 0
assert rows[0].last_seen_at >= rows[0].started_at
def test_rle_inserts_new_row_on_state_change(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
snapshots = iter([_status(players=0), _status(players=1)])
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: next(snapshots),
)
with app.app_context():
live_state_poller.poll_once()
live_state_poller.poll_once()
with session_scope() as db:
rows = db.scalars(
select(ServerLiveState).where(ServerLiveState.server_id == sid)
.order_by(ServerLiveState.started_at)
).all()
assert [r.players for r in rows] == [0, 1]
def test_skips_servers_without_rcon_password(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
with session_scope() as db:
s = db.scalar(select(Server).where(Server.id == sid))
s.rcon_password = ""
called: list = []
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: called.append(1) or _status(0),
)
with app.app_context():
live_state_poller.poll_once()
assert called == []
def test_skips_non_running_servers(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
with session_scope() as db:
s = db.scalar(select(Server).where(Server.id == sid))
s.actual_state = "stopped"
called: list = []
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: called.append(1) or _status(0),
)
with app.app_context():
live_state_poller.poll_once()
assert called == []
def _player(steam_id: str = "76561197960828710", name: str = "Alice",
connected: int = 21, ping: int = 60) -> PlayerRow:
return PlayerRow(
steam_id_64=steam_id, name=name, connected_seconds=connected, ping=ping,
)
def test_new_player_opens_session_with_backfilled_join(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: _status(players=1, roster=[_player(connected=30)]),
)
with app.app_context():
live_state_poller.poll_once()
with session_scope() as db:
sessions = db.scalars(
select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid)
).all()
assert len(sessions) == 1
s = sessions[0]
assert s.steam_id_64 == "76561197960828710"
assert s.name_at_join == "Alice"
assert s.left_at is None
assert s.min_ping == 60
assert s.max_ping == 60
# joined_at should be ~30s before now
delta = (datetime.now(UTC).replace(tzinfo=None) - s.joined_at).total_seconds()
assert 25 <= delta <= 60
def test_session_closes_when_player_no_longer_in_roster(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
snapshots = iter([
_status(players=1, roster=[_player()]),
_status(players=0, roster=[]),
])
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: next(snapshots),
)
with app.app_context():
live_state_poller.poll_once()
live_state_poller.poll_once()
with session_scope() as db:
sessions = db.scalars(
select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid)
).all()
assert len(sessions) == 1
assert sessions[0].left_at is not None
def test_session_ping_range_extends(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
snapshots = iter([
_status(players=1, roster=[_player(ping=60)]),
_status(players=1, roster=[_player(ping=200)]),
_status(players=1, roster=[_player(ping=40)]),
])
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: next(snapshots),
)
with app.app_context():
live_state_poller.poll_once()
live_state_poller.poll_once()
live_state_poller.poll_once()
with session_scope() as db:
s = db.scalar(select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid))
assert s.min_ping == 40
assert s.max_ping == 200
def test_session_skips_bots(tmp_path, monkeypatch) -> None:
# Bots have non-STEAM uniqueid; our parser already drops them, but verify
# that even if a non-STEAM steam_id_64 makes it through, sessions filter
# it out. We exercise the filter directly by giving a string that doesn't
# match the expected SteamID64 numeric form.
app, sid = _seed(tmp_path)
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: _status(
players=0, roster=[_player(steam_id="BOT", name="Coach")]
),
)
with app.app_context():
live_state_poller.poll_once()
with session_scope() as db:
sessions = db.scalars(
select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid)
).all()
assert sessions == []
def test_enriches_missing_profile(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
app.config["STEAM_WEB_API_KEY"] = "KEY"
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: _status(players=1, roster=[_player()]),
)
captured: list = []
def fake_fetch(ids, *, api_key):
captured.append((list(ids), api_key))
return [steam_users.SteamProfile(
steam_id_64="76561197960828710",
persona_name="Alice",
avatar_url="https://avatars.../alice_medium.jpg",
)]
monkeypatch.setattr(live_state_poller, "fetch_profiles_batch", fake_fetch)
with app.app_context():
live_state_poller.poll_once()
assert captured and captured[0][1] == "KEY"
with session_scope() as db:
p = db.scalar(select(SteamUserProfile))
assert p is not None
assert p.persona_name == "Alice"
def test_skips_enrichment_when_api_key_unset(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
app.config["STEAM_WEB_API_KEY"] = ""
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: _status(players=1, roster=[_player()]),
)
monkeypatch.setattr(
live_state_poller, "fetch_profiles_batch",
lambda ids, *, api_key: pytest.fail("must not call without key"),
)
with app.app_context():
live_state_poller.poll_once()
def test_retention_trims_old_rows(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
app.config["LIVE_STATE_HISTORY_DAYS"] = 30
long_ago = datetime.now(UTC).replace(tzinfo=None) - timedelta(days=45)
with session_scope() as db:
db.add(ServerLiveState(
server_id=sid, started_at=long_ago, last_seen_at=long_ago,
players=0, max_players=4, bots=0, map="old", hibernating=True,
))
db.add(ServerPlayerSession(
server_id=sid, steam_id_64="76561197960828710",
joined_at=long_ago, left_at=long_ago,
name_at_join="OldPlayer", min_ping=10, max_ping=10,
))
with app.app_context():
live_state_poller.prune_history()
with session_scope() as db:
snaps = db.scalars(select(ServerLiveState)).all()
sess = db.scalars(select(ServerPlayerSession)).all()
assert snaps == []
assert sess == []
def test_close_stuck_sessions_after_threshold(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
app.config["STUCK_SESSION_SECONDS"] = 60
way_back = datetime.now(UTC).replace(tzinfo=None) - timedelta(hours=2)
with session_scope() as db:
db.add(ServerPlayerSession(
server_id=sid, steam_id_64="76561197960828710",
joined_at=way_back, left_at=None,
name_at_join="GhostPlayer", min_ping=10, max_ping=10,
))
# Server is in `targets` but the RCON call fails — i.e., we haven't seen
# this server respond in a long time. Poller must close stuck sessions.
def boom(*a, **kw):
from l4d2web.services.rcon import RconError
raise RconError("simulated outage")
monkeypatch.setattr(live_state_poller, "query_status", boom)
with app.app_context():
live_state_poller.poll_once()
with session_scope() as db:
row = db.scalar(select(ServerPlayerSession))
assert row.left_at is not None
def test_start_live_state_poller_skipped_during_testing(monkeypatch, tmp_path) -> None:
from l4d2web import app as app_module
called: list = []
monkeypatch.setattr(
app_module, "start_live_state_poller", lambda app: called.append(app)
)
app_module.create_app({"TESTING": True, "DATABASE_URL": f"sqlite:///{tmp_path/'x.db'}", "SECRET_KEY": "k"})
assert called == []
def test_start_live_state_poller_started_outside_testing(monkeypatch, tmp_path) -> None:
from l4d2web import app as app_module
called: list = []
monkeypatch.setattr(
app_module, "start_live_state_poller", lambda app: called.append(app)
)
app = app_module.create_app(
{"TESTING": False, "DATABASE_URL": f"sqlite:///{tmp_path/'x.db'}", "SECRET_KEY": "k"}
)
assert called == [app]
def test_skips_enrichment_when_cache_is_fresh(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
app.config["STEAM_WEB_API_KEY"] = "KEY"
with session_scope() as db:
db.add(SteamUserProfile(
steam_id_64="76561197960828710",
persona_name="cached",
avatar_url="cached.jpg",
fetched_at=datetime.now(UTC).replace(tzinfo=None),
))
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: _status(players=1, roster=[_player()]),
)
called: list = []
monkeypatch.setattr(
live_state_poller, "fetch_profiles_batch",
lambda ids, *, api_key: called.append(list(ids)) or [],
)
with app.app_context():
live_state_poller.poll_once()
assert called == []