"""Live-state poller tests. Each test seeds an app + DB, monkeypatches the RCON client to return a canned StatusResponse, and asserts on what the poller writes. """ from __future__ import annotations from datetime import datetime, timedelta, UTC import pytest from sqlalchemy import select from l4d2web.app import create_app from l4d2web.db import init_db, session_scope from l4d2web.models import ( Blueprint, Server, ServerLiveState, ServerPlayerSession, SteamUserProfile, User, ) from l4d2web.services import live_state_poller from l4d2web.services import steam_users from l4d2web.services.rcon import PlayerRow, StatusResponse def _seed(tmp_path): db_url = f"sqlite:///{tmp_path/'p.db'}" import os os.environ["DATABASE_URL"] = db_url app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "x"}) init_db() with session_scope() as db: u = User(username="u", password_digest="x"); db.add(u); db.flush() bp = Blueprint(user_id=u.id, name="bp", arguments="[]", config="[]"); db.add(bp); db.flush() s = Server( user_id=u.id, blueprint_id=bp.id, name="s", port=27500, rcon_password="pw", actual_state="running", ) db.add(s); db.flush() return app, s.id def _status(players: int, map_: str = "c1m1_hotel", hibernating: bool = False, roster: list[PlayerRow] | None = None) -> StatusResponse: return StatusResponse( map=map_, players=players, max_players=4, bots=0, hibernating=hibernating, roster=roster or [], ) def test_rle_bumps_last_seen_when_state_unchanged(tmp_path, monkeypatch) -> None: app, sid = _seed(tmp_path) monkeypatch.setattr( live_state_poller, "query_status", lambda host, port, password, timeout: _status(players=0), ) with app.app_context(): live_state_poller.poll_once() live_state_poller.poll_once() with session_scope() as db: rows = db.scalars( select(ServerLiveState).where(ServerLiveState.server_id == sid) .order_by(ServerLiveState.started_at) ).all() assert len(rows) == 1 assert rows[0].players == 0 assert rows[0].last_seen_at >= rows[0].started_at def test_rle_inserts_new_row_on_state_change(tmp_path, monkeypatch) -> None: app, sid = _seed(tmp_path) snapshots = iter([_status(players=0), _status(players=1)]) monkeypatch.setattr( live_state_poller, "query_status", lambda *a, **kw: next(snapshots), ) with app.app_context(): live_state_poller.poll_once() live_state_poller.poll_once() with session_scope() as db: rows = db.scalars( select(ServerLiveState).where(ServerLiveState.server_id == sid) .order_by(ServerLiveState.started_at) ).all() assert [r.players for r in rows] == [0, 1] def test_skips_servers_without_rcon_password(tmp_path, monkeypatch) -> None: app, sid = _seed(tmp_path) with session_scope() as db: s = db.scalar(select(Server).where(Server.id == sid)) s.rcon_password = "" called: list = [] monkeypatch.setattr( live_state_poller, "query_status", lambda *a, **kw: called.append(1) or _status(0), ) with app.app_context(): live_state_poller.poll_once() assert called == [] def test_skips_non_running_servers(tmp_path, monkeypatch) -> None: app, sid = _seed(tmp_path) with session_scope() as db: s = db.scalar(select(Server).where(Server.id == sid)) s.actual_state = "stopped" called: list = [] monkeypatch.setattr( live_state_poller, "query_status", lambda *a, **kw: called.append(1) or _status(0), ) with app.app_context(): live_state_poller.poll_once() assert called == [] def _player(steam_id: str = "76561197960828710", name: str = "Alice", connected: int = 21, ping: int = 60) -> PlayerRow: return PlayerRow( steam_id_64=steam_id, name=name, connected_seconds=connected, ping=ping, ) def test_new_player_opens_session_with_backfilled_join(tmp_path, monkeypatch) -> None: app, sid = _seed(tmp_path) monkeypatch.setattr( live_state_poller, "query_status", lambda *a, **kw: _status(players=1, roster=[_player(connected=30)]), ) with app.app_context(): live_state_poller.poll_once() with session_scope() as db: sessions = db.scalars( select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid) ).all() assert len(sessions) == 1 s = sessions[0] assert s.steam_id_64 == "76561197960828710" assert s.name_at_join == "Alice" assert s.left_at is None assert s.min_ping == 60 assert s.max_ping == 60 # joined_at should be ~30s before now delta = (datetime.now(UTC) - s.joined_at).total_seconds() assert 25 <= delta <= 60 def test_session_closes_when_player_no_longer_in_roster(tmp_path, monkeypatch) -> None: app, sid = _seed(tmp_path) snapshots = iter([ _status(players=1, roster=[_player()]), _status(players=0, roster=[]), ]) monkeypatch.setattr( live_state_poller, "query_status", lambda *a, **kw: next(snapshots), ) with app.app_context(): live_state_poller.poll_once() live_state_poller.poll_once() with session_scope() as db: sessions = db.scalars( select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid) ).all() assert len(sessions) == 1 assert sessions[0].left_at is not None def test_session_ping_range_extends(tmp_path, monkeypatch) -> None: app, sid = _seed(tmp_path) snapshots = iter([ _status(players=1, roster=[_player(ping=60)]), _status(players=1, roster=[_player(ping=200)]), _status(players=1, roster=[_player(ping=40)]), ]) monkeypatch.setattr( live_state_poller, "query_status", lambda *a, **kw: next(snapshots), ) with app.app_context(): live_state_poller.poll_once() live_state_poller.poll_once() live_state_poller.poll_once() with session_scope() as db: s = db.scalar(select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid)) assert s.min_ping == 40 assert s.max_ping == 200 def test_session_skips_bots(tmp_path, monkeypatch) -> None: # Bots have non-STEAM uniqueid; our parser already drops them, but verify # that even if a non-STEAM steam_id_64 makes it through, sessions filter # it out. We exercise the filter directly by giving a string that doesn't # match the expected SteamID64 numeric form. app, sid = _seed(tmp_path) monkeypatch.setattr( live_state_poller, "query_status", lambda *a, **kw: _status( players=0, roster=[_player(steam_id="BOT", name="Coach")] ), ) with app.app_context(): live_state_poller.poll_once() with session_scope() as db: sessions = db.scalars( select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid) ).all() assert sessions == [] def test_enriches_missing_profile(tmp_path, monkeypatch) -> None: app, sid = _seed(tmp_path) app.config["STEAM_WEB_API_KEY"] = "KEY" monkeypatch.setattr( live_state_poller, "query_status", lambda *a, **kw: _status(players=1, roster=[_player()]), ) captured: list = [] def fake_fetch(ids, *, api_key): captured.append((list(ids), api_key)) return [steam_users.SteamProfile( steam_id_64="76561197960828710", persona_name="Alice", avatar_url="https://avatars.../alice_medium.jpg", )] monkeypatch.setattr(live_state_poller, "fetch_profiles_batch", fake_fetch) with app.app_context(): live_state_poller.poll_once() assert captured and captured[0][1] == "KEY" with session_scope() as db: p = db.scalar(select(SteamUserProfile)) assert p is not None assert p.persona_name == "Alice" def test_skips_enrichment_when_api_key_unset(tmp_path, monkeypatch) -> None: app, sid = _seed(tmp_path) app.config["STEAM_WEB_API_KEY"] = "" monkeypatch.setattr( live_state_poller, "query_status", lambda *a, **kw: _status(players=1, roster=[_player()]), ) monkeypatch.setattr( live_state_poller, "fetch_profiles_batch", lambda ids, *, api_key: pytest.fail("must not call without key"), ) with app.app_context(): live_state_poller.poll_once() def test_retention_trims_old_rows(tmp_path, monkeypatch) -> None: app, sid = _seed(tmp_path) app.config["LIVE_STATE_HISTORY_DAYS"] = 30 long_ago = datetime.now(UTC) - timedelta(days=45) with session_scope() as db: db.add(ServerLiveState( server_id=sid, started_at=long_ago, last_seen_at=long_ago, players=0, max_players=4, bots=0, map="old", hibernating=True, )) db.add(ServerPlayerSession( server_id=sid, steam_id_64="76561197960828710", joined_at=long_ago, left_at=long_ago, name_at_join="OldPlayer", min_ping=10, max_ping=10, )) with app.app_context(): live_state_poller.prune_history() with session_scope() as db: snaps = db.scalars(select(ServerLiveState)).all() sess = db.scalars(select(ServerPlayerSession)).all() assert snaps == [] assert sess == [] def test_close_stuck_sessions_after_threshold(tmp_path, monkeypatch) -> None: app, sid = _seed(tmp_path) app.config["STUCK_SESSION_SECONDS"] = 60 way_back = datetime.now(UTC) - timedelta(hours=2) with session_scope() as db: db.add(ServerPlayerSession( server_id=sid, steam_id_64="76561197960828710", joined_at=way_back, left_at=None, name_at_join="GhostPlayer", min_ping=10, max_ping=10, )) # Server is in `targets` but the RCON call fails — i.e., we haven't seen # this server respond in a long time. Poller must close stuck sessions. def boom(*a, **kw): from l4d2web.services.rcon import RconError raise RconError("simulated outage") monkeypatch.setattr(live_state_poller, "query_status", boom) with app.app_context(): live_state_poller.poll_once() with session_scope() as db: row = db.scalar(select(ServerPlayerSession)) assert row.left_at is not None def test_start_live_state_poller_skipped_during_testing(monkeypatch, tmp_path) -> None: from l4d2web import app as app_module called: list = [] monkeypatch.setattr( app_module, "start_live_state_poller", lambda app: called.append(app) ) app_module.create_app({"TESTING": True, "DATABASE_URL": f"sqlite:///{tmp_path/'x.db'}", "SECRET_KEY": "k"}) assert called == [] def test_start_live_state_poller_started_outside_testing(monkeypatch, tmp_path) -> None: from l4d2web import app as app_module called: list = [] monkeypatch.setattr( app_module, "start_live_state_poller", lambda app: called.append(app) ) app = app_module.create_app( {"TESTING": False, "DATABASE_URL": f"sqlite:///{tmp_path/'x.db'}", "SECRET_KEY": "k"} ) assert called == [app] def test_skips_enrichment_when_cache_is_fresh(tmp_path, monkeypatch) -> None: app, sid = _seed(tmp_path) app.config["STEAM_WEB_API_KEY"] = "KEY" with session_scope() as db: db.add(SteamUserProfile( steam_id_64="76561197960828710", persona_name="cached", avatar_url="cached.jpg", fetched_at=datetime.now(UTC), )) monkeypatch.setattr( live_state_poller, "query_status", lambda *a, **kw: _status(players=1, roster=[_player()]), ) called: list = [] monkeypatch.setattr( live_state_poller, "fetch_profiles_batch", lambda ids, *, api_key: called.append(list(ids)) or [], ) with app.app_context(): live_state_poller.poll_once() assert called == []