left4me/l4d2web/services/live_state_poller.py
mwiegand be476112ee
feat(live-state): enrich roster with cached Steam profiles
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 22:02:58 +02:00

188 lines
6.4 KiB
Python

"""Background poller that maintains live game-server state in the DB.
Modeled on l4d2web/services/job_worker.py:617-647. This module owns:
- per-server snapshot writes with run-length encoding into
`server_live_state`
- player-session lifecycle in `server_player_session` (Task 7)
- Steam profile enrichment into `steam_user_profile` (Task 8)
- retention pruning and stuck-session closure (Task 10)
This file is built up across Tasks 6-10.
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, UTC
from flask import current_app
from sqlalchemy import select
from l4d2web.db import session_scope
from l4d2web.models import (
Server,
ServerLiveState,
ServerPlayerSession,
SteamUserProfile,
)
from l4d2web.services.rcon import RconError, StatusResponse, query_status
from l4d2web.services.steam_users import fetch_profiles_batch
logger = logging.getLogger(__name__)
def _now() -> datetime:
return datetime.now(UTC).replace(tzinfo=None)
def poll_once() -> None:
"""One pass over all running servers with a configured rcon_password."""
with session_scope() as db:
servers = db.scalars(
select(Server)
.where(Server.actual_state == "running")
.where(Server.rcon_password != "")
).all()
targets = [(s.id, s.port, s.rcon_password) for s in servers]
api_key = current_app.config.get("STEAM_WEB_API_KEY", "") or ""
ttl_seconds = int(current_app.config.get("STEAM_PROFILE_TTL_SECONDS", 86400))
for server_id, port, password in targets:
try:
status = query_status("127.0.0.1", port, password, timeout=2.0)
except RconError:
logger.warning("rcon query failed for server %d", server_id, exc_info=True)
continue
_record_snapshot(server_id, status)
_reconcile_sessions(server_id, status)
if api_key:
_enrich_profiles(status, api_key=api_key, ttl_seconds=ttl_seconds)
def _enrich_profiles(status: StatusResponse, *, api_key: str, ttl_seconds: int) -> None:
"""Fetch+cache Steam profile data for any roster IDs missing or stale."""
roster_ids = {p.steam_id_64 for p in status.roster if _is_valid_steam_id_64(p.steam_id_64)}
if not roster_ids:
return
cutoff = _now() - timedelta(seconds=ttl_seconds)
with session_scope() as db:
fresh = set(db.scalars(
select(SteamUserProfile.steam_id_64).where(
SteamUserProfile.steam_id_64.in_(roster_ids),
SteamUserProfile.fetched_at >= cutoff,
)
).all())
needs_fetch = sorted(roster_ids - fresh)
if not needs_fetch:
return
try:
profiles = fetch_profiles_batch(needs_fetch, api_key=api_key)
except Exception: # network / API errors are soft-fail
logger.warning("steam profile enrichment failed", exc_info=True)
return
now = _now()
with session_scope() as db:
for p in profiles:
row = db.get(SteamUserProfile, p.steam_id_64)
if row is None:
db.add(SteamUserProfile(
steam_id_64=p.steam_id_64,
persona_name=p.persona_name,
avatar_url=p.avatar_url,
fetched_at=now,
))
else:
row.persona_name = p.persona_name
row.avatar_url = p.avatar_url
row.fetched_at = now
def _record_snapshot(server_id: int, status: StatusResponse) -> None:
"""RLE write: bump last_seen_at if state matches, else insert a new row."""
now = _now()
with session_scope() as db:
latest = db.scalars(
select(ServerLiveState)
.where(ServerLiveState.server_id == server_id)
.order_by(ServerLiveState.started_at.desc())
.limit(1)
).first()
if latest is not None and _matches(latest, status):
latest.last_seen_at = now
return
db.add(
ServerLiveState(
server_id=server_id,
started_at=now,
last_seen_at=now,
players=status.players,
max_players=status.max_players,
bots=status.bots,
map=status.map,
hibernating=status.hibernating,
)
)
def _matches(row: ServerLiveState, status: StatusResponse) -> bool:
return (
row.players == status.players
and row.max_players == status.max_players
and row.bots == status.bots
and row.map == status.map
and row.hibernating == status.hibernating
)
_STEAM_ID_64_PREFIX = "7656" # all SteamID64s start with this; bots/anon do not
def _is_valid_steam_id_64(value: str) -> bool:
return value.startswith(_STEAM_ID_64_PREFIX) and value.isdigit() and len(value) == 17
def _reconcile_sessions(server_id: int, status: StatusResponse) -> None:
"""Open new sessions, update ping ranges, close departed sessions."""
now = _now()
roster = [p for p in status.roster if _is_valid_steam_id_64(p.steam_id_64)]
seen_ids = {p.steam_id_64 for p in roster}
with session_scope() as db:
open_rows = db.scalars(
select(ServerPlayerSession).where(
ServerPlayerSession.server_id == server_id,
ServerPlayerSession.left_at.is_(None),
)
).all()
open_by_sid = {r.steam_id_64: r for r in open_rows}
# Close sessions for players no longer in the roster.
for sid, row in open_by_sid.items():
if sid not in seen_ids:
row.left_at = now
# Open / update sessions for current roster.
for p in roster:
existing = open_by_sid.get(p.steam_id_64)
if existing is None:
db.add(
ServerPlayerSession(
server_id=server_id,
steam_id_64=p.steam_id_64,
joined_at=now - timedelta(seconds=p.connected_seconds),
left_at=None,
name_at_join=p.name,
min_ping=p.ping,
max_ping=p.ping,
)
)
else:
if p.ping < existing.min_ping:
existing.min_ping = p.ping
if p.ping > existing.max_ping:
existing.max_ping = p.ping