"""Background poller that maintains live game-server state in the DB. Modeled on l4d2web/services/job_worker.py:617-647. This module owns: - per-server snapshot writes with run-length encoding into `server_live_state` - player-session lifecycle in `server_player_session` (Task 7) - Steam profile enrichment into `steam_user_profile` (Task 8) - retention pruning and stuck-session closure (Task 10) This file is built up across Tasks 6-10. """ from __future__ import annotations import logging import threading import time from datetime import datetime, timedelta, UTC from flask import current_app from sqlalchemy import select from l4d2web.db import session_scope from l4d2web.models import ( Server, ServerLiveState, ServerPlayerSession, SteamUserProfile, ) from l4d2web.services.rcon import RconError, StatusResponse, query_status from l4d2web.services.steam_users import fetch_profiles_batch logger = logging.getLogger(__name__) def _now() -> datetime: return datetime.now(UTC).replace(tzinfo=None) def poll_once() -> None: """One pass over all running servers with a configured rcon_password.""" with session_scope() as db: servers = db.scalars( select(Server) .where(Server.actual_state == "running") .where(Server.rcon_password != "") ).all() targets = [(s.id, s.port, s.rcon_password) for s in servers] api_key = current_app.config.get("STEAM_WEB_API_KEY", "") or "" ttl_seconds = int(current_app.config.get("STEAM_PROFILE_TTL_SECONDS", 86400)) timeout = float(current_app.config.get("LIVE_STATE_QUERY_TIMEOUT_SECONDS", 2.0)) for server_id, port, password in targets: try: status = query_status("127.0.0.1", port, password, timeout=timeout) except RconError: logger.warning("rcon query failed for server %d", server_id, exc_info=True) _close_stuck_sessions(server_id) continue _record_snapshot(server_id, status) _reconcile_sessions(server_id, status) if api_key: _enrich_profiles(status, api_key=api_key, ttl_seconds=ttl_seconds) def _enrich_profiles(status: StatusResponse, *, api_key: str, ttl_seconds: int) -> None: """Fetch+cache Steam profile data for any roster IDs missing or stale.""" roster_ids = {p.steam_id_64 for p in status.roster if _is_valid_steam_id_64(p.steam_id_64)} if not roster_ids: return cutoff = _now() - timedelta(seconds=ttl_seconds) with session_scope() as db: fresh = set(db.scalars( select(SteamUserProfile.steam_id_64).where( SteamUserProfile.steam_id_64.in_(roster_ids), SteamUserProfile.fetched_at >= cutoff, ) ).all()) needs_fetch = sorted(roster_ids - fresh) if not needs_fetch: return try: profiles = fetch_profiles_batch(needs_fetch, api_key=api_key) except Exception: # network / API errors are soft-fail logger.warning("steam profile enrichment failed", exc_info=True) return now = _now() with session_scope() as db: for p in profiles: row = db.get(SteamUserProfile, p.steam_id_64) if row is None: db.add(SteamUserProfile( steam_id_64=p.steam_id_64, persona_name=p.persona_name, avatar_url=p.avatar_url, fetched_at=now, )) else: row.persona_name = p.persona_name row.avatar_url = p.avatar_url row.fetched_at = now def prune_history() -> None: """Delete snapshots and closed sessions older than retention.""" days = int(current_app.config.get("LIVE_STATE_HISTORY_DAYS", 30)) cutoff = _now() - timedelta(days=days) with session_scope() as db: db.query(ServerLiveState).filter( ServerLiveState.last_seen_at < cutoff ).delete(synchronize_session=False) db.query(ServerPlayerSession).filter( ServerPlayerSession.left_at.is_not(None), ServerPlayerSession.left_at < cutoff, ).delete(synchronize_session=False) def _close_stuck_sessions(server_id: int) -> None: """If a server has open sessions whose joined_at is older than threshold AND we've failed to observe them, close them as stuck.""" seconds = int(current_app.config.get("STUCK_SESSION_SECONDS", 60)) cutoff = _now() - timedelta(seconds=seconds) with session_scope() as db: rows = db.scalars( select(ServerPlayerSession).where( ServerPlayerSession.server_id == server_id, ServerPlayerSession.left_at.is_(None), ServerPlayerSession.joined_at < cutoff, ) ).all() for row in rows: row.left_at = _now() def _record_snapshot(server_id: int, status: StatusResponse) -> None: """RLE write: bump last_seen_at if state matches, else insert a new row.""" now = _now() with session_scope() as db: latest = db.scalars( select(ServerLiveState) .where(ServerLiveState.server_id == server_id) .order_by(ServerLiveState.started_at.desc()) .limit(1) ).first() if latest is not None and _matches(latest, status): latest.last_seen_at = now return db.add( ServerLiveState( server_id=server_id, started_at=now, last_seen_at=now, players=status.players, max_players=status.max_players, bots=status.bots, map=status.map, hibernating=status.hibernating, ) ) def _matches(row: ServerLiveState, status: StatusResponse) -> bool: return ( row.players == status.players and row.max_players == status.max_players and row.bots == status.bots and row.map == status.map and row.hibernating == status.hibernating ) _STEAM_ID_64_PREFIX = "7656" # all SteamID64s start with this; bots/anon do not def _is_valid_steam_id_64(value: str) -> bool: return value.startswith(_STEAM_ID_64_PREFIX) and value.isdigit() and len(value) == 17 def _reconcile_sessions(server_id: int, status: StatusResponse) -> None: """Open new sessions, update ping ranges, close departed sessions.""" now = _now() roster = [p for p in status.roster if _is_valid_steam_id_64(p.steam_id_64)] seen_ids = {p.steam_id_64 for p in roster} with session_scope() as db: open_rows = db.scalars( select(ServerPlayerSession).where( ServerPlayerSession.server_id == server_id, ServerPlayerSession.left_at.is_(None), ) ).all() open_by_sid = {r.steam_id_64: r for r in open_rows} # Close sessions for players no longer in the roster. for sid, row in open_by_sid.items(): if sid not in seen_ids: row.left_at = now # Open / update sessions for current roster. for p in roster: existing = open_by_sid.get(p.steam_id_64) if existing is None: db.add( ServerPlayerSession( server_id=server_id, steam_id_64=p.steam_id_64, joined_at=now - timedelta(seconds=p.connected_seconds), left_at=None, name_at_join=p.name, min_ping=p.ping, max_ping=p.ping, ) ) else: if p.ping < existing.min_ping: existing.min_ping = p.ping if p.ping > existing.max_ping: existing.max_ping = p.ping _poller_started_lock = threading.Lock() _poller_started = False def start_live_state_poller(app) -> None: """Spawn the daemon poller thread once per process.""" global _poller_started with _poller_started_lock: if _poller_started: return _poller_started = True interval = float(app.config.get("LIVE_STATE_POLL_SECONDS", 5)) retention_every = max(1, int(app.config.get("LIVE_STATE_RETENTION_EVERY_TICKS", 60))) thread = threading.Thread( target=_poller_loop, args=(app, interval, retention_every), name="left4me-live-state-poller", daemon=True, ) thread.start() def _poller_loop(app, interval: float, retention_every: int) -> None: tick = 0 while True: try: with app.app_context(): poll_once() if tick % retention_every == 0: prune_history() except Exception: logger.exception("live-state poller loop exception") tick += 1 time.sleep(interval)