left4me/docs/superpowers/plans/2026-05-12-server-live-state-display.md
mwiegand e25e7098f6
refactor(live-state): drop redundant ix_sps_server_recent index
The two indexes ix_sps_server_open and ix_sps_server_recent were
byte-identical because SQLAlchemy's Index(name, *cols) form drops the
DESC ordering the spec intended. Rather than reach for text("left_at
DESC"), drop the second index entirely — SQLite scans the ASC index
backwards at no measurable cost. Spec and plan updated to match.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 21:27:01 +02:00

2597 lines
87 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Server Live-State Display Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add a read-side live-state display to the web UI — counts/map/hibernating on the server list, plus a server-detail panel with current + recent players (avatars from Steam) — backed by a persistent history table.
**Architecture:** A daemon-thread poller (modeled on the existing `start_state_poller` in `l4d2web/services/job_worker.py:617-647`) polls each running game server's RCON port every 5s, writes run-length-encoded snapshots to `server_live_state`, maintains player session intervals in `server_player_session`, and enriches Steam IDs via the Steam Web API into `steam_user_profile` (24h cache). RCON passwords are per-server, auto-generated, and injected into `server.cfg` via the existing spec yaml pipeline — no host-side changes.
**Tech Stack:** Python 3.12+, Flask, SQLAlchemy 2.x, Alembic, `requests` (already used by `steam_workshop.py`), pure-stdlib `socket`/`struct` for the RCON protocol, HTMX for the auto-refreshing detail panel, pytest.
**Spec:** `docs/superpowers/specs/2026-05-12-server-live-state-display-design.md`
---
## File Structure
**Create:**
- `l4d2web/alembic/versions/0010_server_live_state.py` — schema migration: one new column on `servers`, three new tables, password backfill for existing rows
- `l4d2web/services/rcon.py` — Source RCON client + `status` parser
- `l4d2web/services/steam_users.py` — Steam Web API client (mirrors `l4d2web/services/steam_workshop.py:17-43`)
- `l4d2web/services/live_state_poller.py` — daemon thread + poll loop + RLE snapshot + session reconciler
- `l4d2web/templates/_live_state.html` — HTMX-refreshed fragment with summary + current + recent blocks
- `l4d2web/tests/test_rcon.py`, `l4d2web/tests/test_steam_users.py`, `l4d2web/tests/test_live_state_poller.py`
**Modify:**
- `l4d2web/models.py``Server.rcon_password` column + three new model classes
- `l4d2web/services/l4d2_facade.py:28-52``build_server_spec_payload` appends `rcon_password "..."`
- `l4d2web/routes/server_routes.py:58-83``create_server` generates a password; add `live_state_fragment` route
- `l4d2web/templates/servers.html` — inline badge column per server row
- `l4d2web/templates/server_detail.html` — include `_live_state.html` inside an HTMX wrapper
- `l4d2web/app.py` — call `start_live_state_poller(app)` next to existing `start_state_poller`
- `l4d2web/config.py` — register seven new config keys
- `deploy/templates/etc/left4me/web.env` — add `STEAM_WEB_API_KEY=`
- CSP / response headers (find existing setup during Task 11) — add `avatars.*.steamstatic.com` to `img-src`
**Reused (no changes):**
- `l4d2web/services/job_worker.py:617-647` — pattern reference for daemon-thread / poll-loop
- `l4d2web/services/steam_workshop.py:17-43` — pattern reference for the Steam Web API client (`requests.Session`, 30s timeout, threaded executor)
- `l4d2host/instances.py:40-58` — already writes `spec.config` verbatim to `server.cfg`, so injected `rcon_password` line lands automatically
- `l4d2web/templates/_overlay_build_status.html:1-5` — HTMX polling pattern reference
---
## Task 1: Schema migration + ORM models
**Files:**
- Create: `l4d2web/alembic/versions/0010_server_live_state.py`
- Modify: `l4d2web/models.py`
- Test: `l4d2web/tests/test_models.py` (extend or create), `l4d2web/tests/test_migrations.py` (extend or create — check existing test layout)
- [ ] **Step 1: Write the failing model + migration tests**
Add to `l4d2web/tests/test_models.py` (or create it):
```python
import secrets
from sqlalchemy import select
from l4d2web.db import init_db, session_scope
from l4d2web.models import (
Server,
ServerLiveState,
ServerPlayerSession,
SteamUserProfile,
User,
Blueprint,
)
def test_server_has_rcon_password_column(tmp_path, monkeypatch) -> None:
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'t.db'}")
init_db()
with session_scope() as db:
u = User(username="u", password_digest="x")
db.add(u); db.flush()
bp = Blueprint(user_id=u.id, name="bp", arguments="[]", config="[]")
db.add(bp); db.flush()
s = Server(
user_id=u.id, blueprint_id=bp.id, name="s", port=27500,
rcon_password="abc",
)
db.add(s); db.flush()
assert db.scalar(select(Server.rcon_password).where(Server.id == s.id)) == "abc"
def test_server_live_state_table_columns(tmp_path, monkeypatch) -> None:
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'t.db'}")
init_db()
# Smoke check the table exists with the expected columns by inserting a row.
with session_scope() as db:
u = User(username="u", password_digest="x"); db.add(u); db.flush()
bp = Blueprint(user_id=u.id, name="bp", arguments="[]", config="[]"); db.add(bp); db.flush()
s = Server(user_id=u.id, blueprint_id=bp.id, name="s", port=27501, rcon_password="x")
db.add(s); db.flush()
row = ServerLiveState(
server_id=s.id, started_at=now_utc_aware(), last_seen_at=now_utc_aware(),
players=2, max_players=4, bots=0, map="c1m1_hotel", hibernating=False,
)
db.add(row); db.flush()
def test_server_player_session_table_columns(tmp_path, monkeypatch) -> None:
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'t.db'}")
init_db()
with session_scope() as db:
u = User(username="u", password_digest="x"); db.add(u); db.flush()
bp = Blueprint(user_id=u.id, name="bp", arguments="[]", config="[]"); db.add(bp); db.flush()
s = Server(user_id=u.id, blueprint_id=bp.id, name="s", port=27502, rcon_password="x")
db.add(s); db.flush()
row = ServerPlayerSession(
server_id=s.id, steam_id_64="76561197960828710",
joined_at=now_utc_aware(), left_at=None,
name_at_join="Crone", min_ping=42, max_ping=185,
)
db.add(row); db.flush()
def test_steam_user_profile_table_columns(tmp_path, monkeypatch) -> None:
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'t.db'}")
init_db()
with session_scope() as db:
row = SteamUserProfile(
steam_id_64="76561197960828710",
persona_name="MrCool42",
avatar_url="https://avatars.cloudflare.steamstatic.com/abc_medium.jpg",
fetched_at=now_utc_aware(),
)
db.add(row); db.flush()
```
Add this helper next to the imports (or import `now_utc` from models):
```python
from l4d2web.models import now_utc as now_utc_aware
```
- [ ] **Step 2: Run model tests to verify they fail**
Run: `pytest l4d2web/tests/test_models.py -v`
Expected: FAIL — `ImportError: cannot import name 'ServerLiveState'` (or column missing).
- [ ] **Step 3: Add models to `l4d2web/models.py`**
After the existing `Server` class (around line 137), add the new column to `Server`:
```python
class Server(Base):
__tablename__ = "servers"
__table_args__ = (
UniqueConstraint("user_id", "name", name="uq_servers_user_name"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
blueprint_id: Mapped[int] = mapped_column(ForeignKey("blueprints.id"), nullable=False)
name: Mapped[str] = mapped_column(String(128), nullable=False)
port: Mapped[int] = mapped_column(Integer, unique=True, nullable=False)
desired_state: Mapped[str] = mapped_column(String(16), default="stopped", nullable=False)
actual_state: Mapped[str] = mapped_column(String(16), default="unknown", nullable=False)
actual_state_updated_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
last_error: Mapped[str] = mapped_column(Text, default="", nullable=False)
rcon_password: Mapped[str] = mapped_column(
String(64), nullable=False, default="", server_default=""
)
created_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
```
Then add at the end of the file (after the last model):
```python
class ServerLiveState(Base):
__tablename__ = "server_live_state"
__table_args__ = (
Index("ix_sls_server_started", "server_id", "started_at"),
{"sqlite_autoincrement": True},
)
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
server_id: Mapped[int] = mapped_column(
ForeignKey("servers.id", ondelete="CASCADE"), nullable=False
)
started_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
last_seen_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
players: Mapped[int] = mapped_column(Integer, nullable=False)
max_players: Mapped[int] = mapped_column(Integer, nullable=False)
bots: Mapped[int] = mapped_column(Integer, nullable=False)
map: Mapped[str] = mapped_column(String(64), nullable=False)
hibernating: Mapped[bool] = mapped_column(Boolean, nullable=False)
class ServerPlayerSession(Base):
__tablename__ = "server_player_session"
__table_args__ = (
Index("ix_sps_server_open", "server_id", "left_at"),
Index("ix_sps_steam_history", "steam_id_64", "joined_at"),
{"sqlite_autoincrement": True},
)
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
server_id: Mapped[int] = mapped_column(
ForeignKey("servers.id", ondelete="CASCADE"), nullable=False
)
steam_id_64: Mapped[str] = mapped_column(String(20), nullable=False)
joined_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
left_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
name_at_join: Mapped[str] = mapped_column(String(64), nullable=False)
min_ping: Mapped[int] = mapped_column(Integer, nullable=False)
max_ping: Mapped[int] = mapped_column(Integer, nullable=False)
class SteamUserProfile(Base):
__tablename__ = "steam_user_profile"
steam_id_64: Mapped[str] = mapped_column(String(20), primary_key=True)
persona_name: Mapped[str] = mapped_column(String(64), nullable=False)
avatar_url: Mapped[str] = mapped_column(Text, nullable=False)
fetched_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
```
- [ ] **Step 4: Create the migration file**
Create `l4d2web/alembic/versions/0010_server_live_state.py`:
```python
"""server_live_state schema
Revision ID: 0010_server_live_state
Revises: 0009_user_password_changed_at
Create Date: 2026-05-12
"""
from __future__ import annotations
import secrets
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0010_server_live_state"
down_revision: Union[str, Sequence[str], None] = "0009_user_password_changed_at"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# 1. Add rcon_password to servers, default ''
with op.batch_alter_table("servers") as batch:
batch.add_column(
sa.Column(
"rcon_password",
sa.String(length=64),
nullable=False,
server_default="",
)
)
# 2. Backfill every existing row with a freshly generated password.
conn = op.get_bind()
rows = conn.execute(sa.text("SELECT id FROM servers")).fetchall()
for row in rows:
conn.execute(
sa.text("UPDATE servers SET rcon_password = :pw WHERE id = :id"),
{"pw": secrets.token_urlsafe(32), "id": row.id},
)
# 3. server_live_state — run-length-encoded snapshots
op.create_table(
"server_live_state",
sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True),
sa.Column(
"server_id",
sa.Integer(),
sa.ForeignKey("servers.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("started_at", sa.DateTime(), nullable=False),
sa.Column("last_seen_at", sa.DateTime(), nullable=False),
sa.Column("players", sa.Integer(), nullable=False),
sa.Column("max_players", sa.Integer(), nullable=False),
sa.Column("bots", sa.Integer(), nullable=False),
sa.Column("map", sa.String(length=64), nullable=False),
sa.Column("hibernating", sa.Boolean(), nullable=False),
sqlite_autoincrement=True,
)
op.create_index(
"ix_sls_server_started",
"server_live_state",
["server_id", "started_at"],
)
# 4. server_player_session — connection intervals
op.create_table(
"server_player_session",
sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True),
sa.Column(
"server_id",
sa.Integer(),
sa.ForeignKey("servers.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("steam_id_64", sa.String(length=20), nullable=False),
sa.Column("joined_at", sa.DateTime(), nullable=False),
sa.Column("left_at", sa.DateTime(), nullable=True),
sa.Column("name_at_join", sa.String(length=64), nullable=False),
sa.Column("min_ping", sa.Integer(), nullable=False),
sa.Column("max_ping", sa.Integer(), nullable=False),
sqlite_autoincrement=True,
)
op.create_index("ix_sps_server_open", "server_player_session", ["server_id", "left_at"])
op.create_index(
"ix_sps_steam_history", "server_player_session", ["steam_id_64", "joined_at"]
)
# 5. steam_user_profile — 24h profile cache
op.create_table(
"steam_user_profile",
sa.Column("steam_id_64", sa.String(length=20), primary_key=True),
sa.Column("persona_name", sa.String(length=64), nullable=False),
sa.Column("avatar_url", sa.Text(), nullable=False),
sa.Column("fetched_at", sa.DateTime(), nullable=False),
)
def downgrade() -> None:
op.drop_table("steam_user_profile")
op.drop_index("ix_sps_steam_history", table_name="server_player_session")
op.drop_index("ix_sps_server_open", table_name="server_player_session")
op.drop_table("server_player_session")
op.drop_index("ix_sls_server_started", table_name="server_live_state")
op.drop_table("server_live_state")
with op.batch_alter_table("servers") as batch:
batch.drop_column("rcon_password")
```
- [ ] **Step 5: Add a migration-applies-and-backfills test**
Append to `l4d2web/tests/test_migrations.py` (create it if absent):
```python
import secrets
import subprocess
from pathlib import Path
import pytest
import sqlalchemy as sa
from sqlalchemy import create_engine, inspect
def test_migration_0010_backfills_rcon_password(tmp_path: Path, monkeypatch) -> None:
db_path = tmp_path / "t.db"
db_url = f"sqlite:///{db_path}"
monkeypatch.setenv("DATABASE_URL", db_url)
# Run alembic up to 0009 only, then seed two servers, then upgrade to 0010.
repo_root = Path(__file__).resolve().parents[2]
env = {"DATABASE_URL": db_url, "PATH": "/usr/bin:/bin"}
subprocess.run(
["alembic", "upgrade", "0009_user_password_changed_at"],
cwd=repo_root, env={**env}, check=True,
)
engine = create_engine(db_url)
with engine.begin() as conn:
conn.execute(sa.text(
"INSERT INTO users (id, username, password_digest, admin, active, "
"created_at, updated_at, password_changed_at) "
"VALUES (1, 'u', 'x', 0, 1, '2026-01-01', '2026-01-01', '2026-01-01')"
))
conn.execute(sa.text(
"INSERT INTO blueprints (id, user_id, name, arguments, config, "
"created_at, updated_at) "
"VALUES (1, 1, 'bp', '[]', '[]', '2026-01-01', '2026-01-01')"
))
conn.execute(sa.text(
"INSERT INTO servers (id, user_id, blueprint_id, name, port, "
"desired_state, actual_state, last_error, created_at, updated_at) "
"VALUES (1, 1, 1, 's1', 27600, 'stopped', 'unknown', '', "
"'2026-01-01', '2026-01-01')"
))
conn.execute(sa.text(
"INSERT INTO servers (id, user_id, blueprint_id, name, port, "
"desired_state, actual_state, last_error, created_at, updated_at) "
"VALUES (2, 1, 1, 's2', 27601, 'stopped', 'unknown', '', "
"'2026-01-01', '2026-01-01')"
))
subprocess.run(
["alembic", "upgrade", "0010_server_live_state"],
cwd=repo_root, env={**env}, check=True,
)
with engine.connect() as conn:
rows = conn.execute(sa.text("SELECT id, rcon_password FROM servers ORDER BY id")).fetchall()
assert len(rows) == 2
for row in rows:
assert len(row.rcon_password) >= 32
assert row.rcon_password.replace("_", "").replace("-", "").isalnum()
inspector = inspect(engine)
assert "server_live_state" in inspector.get_table_names()
assert "server_player_session" in inspector.get_table_names()
assert "steam_user_profile" in inspector.get_table_names()
```
If the existing test layout already has a migrations fixture, prefer it. Confirm by skimming `l4d2web/tests/conftest.py` and any `test_migrations.py` once before writing this.
- [ ] **Step 6: Run all the new tests, verify pass**
Run: `pytest l4d2web/tests/test_models.py l4d2web/tests/test_migrations.py -v`
Expected: PASS.
- [ ] **Step 7: Full-suite regression**
Run: `pytest l4d2web/tests -q`
Expected: PASS (no regressions).
- [ ] **Step 8: Commit**
```bash
git add l4d2web/alembic/versions/0010_server_live_state.py l4d2web/models.py \
l4d2web/tests/test_models.py l4d2web/tests/test_migrations.py
git commit -m "feat(live-state): add schema for snapshots, sessions, steam profiles"
```
---
## Task 2: RCON client — protocol handshake + auth
**Files:**
- Create: `l4d2web/services/rcon.py`
- Test: `l4d2web/tests/test_rcon.py`
- [ ] **Step 1: Write the failing handshake tests**
Create `l4d2web/tests/test_rcon.py`:
```python
"""Source RCON client tests against an in-process TCP fixture.
The handshake quirk we verified live: after a SERVERDATA_AUTH (type=3) the
server sends a SERVERDATA_RESPONSE_VALUE (type=0) FIRST and THEN the
SERVERDATA_AUTH_RESPONSE (type=2). The auth response's req_id == -1 means
bad password. The client must consume both packets before sending the
command.
"""
from __future__ import annotations
import socket
import struct
import threading
from contextlib import contextmanager
from typing import Iterator
import pytest
from l4d2web.services.rcon import (
RconAuthError,
RconError,
query_status,
)
def _pack(req_id: int, ptype: int, body: str) -> bytes:
body_bytes = body.encode("utf-8") + b"\x00\x00"
size = 4 + 4 + len(body_bytes)
return struct.pack("<iii", size, req_id, ptype) + body_bytes
def _unpack_one(conn: socket.socket) -> tuple[int, int, str]:
raw_size = conn.recv(4)
size = struct.unpack("<i", raw_size)[0]
payload = b""
while len(payload) < size:
payload += conn.recv(size - len(payload))
req_id, ptype = struct.unpack("<ii", payload[:8])
body = payload[8:].rstrip(b"\x00").decode("utf-8", errors="replace")
return req_id, ptype, body
@contextmanager
def fake_rcon_server(handler) -> Iterator[int]:
"""Start a TCP server on an ephemeral port; handler(conn) runs in a thread."""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("127.0.0.1", 0))
server.listen(1)
port = server.getsockname()[1]
server.settimeout(3.0)
def serve() -> None:
try:
conn, _ = server.accept()
try:
handler(conn)
finally:
conn.close()
except Exception:
pass
t = threading.Thread(target=serve, daemon=True)
t.start()
try:
yield port
finally:
server.close()
t.join(timeout=1.0)
def test_auth_success_then_status(monkeypatch: pytest.MonkeyPatch) -> None:
response_body = (
"hostname: Left 4 Dead 2\n"
"version : 2.2.4.3 9309 secure (unknown)\n"
"udp/ip : 127.0.0.1:27016 [ public 1.2.3.4:27016 ]\n"
"os : Linux Dedicated\n"
"map : c1m2_streets\n"
"players : 1 humans, 0 bots (4 max) (not hibernating) (reserved 1860000e7e5e446)\n"
"\n"
"# userid name uniqueid connected ping loss state rate adr\n"
'# 2 1 "Crone" STEAM_1:0:12376499 00:21 185 20 active 30000 91.55.5.100:27005\n'
"#end\n"
)
def handler(conn: socket.socket) -> None:
req_id, ptype, body = _unpack_one(conn)
assert ptype == 3
assert body == "letmein"
conn.sendall(_pack(req_id, 0, ""))
conn.sendall(_pack(req_id, 2, ""))
cmd_id, cmd_type, cmd = _unpack_one(conn)
assert cmd_type == 2
assert cmd == "status"
conn.sendall(_pack(cmd_id, 0, response_body))
with fake_rcon_server(handler) as port:
result = query_status("127.0.0.1", port, "letmein", timeout=2.0)
assert result.map == "c1m2_streets"
assert result.players == 1
assert result.bots == 0
assert result.max_players == 4
assert result.hibernating is False
assert len(result.roster) == 1
p = result.roster[0]
assert p.name == "Crone"
assert p.steam_id_64 == "76561197985018726" # 76561197960265728 + 0 + 12376499*2
assert p.connected_seconds == 21
assert p.ping == 185
def test_auth_failure_raises(monkeypatch: pytest.MonkeyPatch) -> None:
def handler(conn: socket.socket) -> None:
req_id, _, _ = _unpack_one(conn)
conn.sendall(_pack(req_id, 0, ""))
conn.sendall(_pack(-1, 2, "")) # bad password sentinel
with fake_rcon_server(handler) as port:
with pytest.raises(RconAuthError):
query_status("127.0.0.1", port, "wrong", timeout=2.0)
def test_timeout_raises(monkeypatch: pytest.MonkeyPatch) -> None:
def handler(conn: socket.socket) -> None:
import time
time.sleep(3.0)
with fake_rcon_server(handler) as port:
with pytest.raises(RconError):
query_status("127.0.0.1", port, "x", timeout=0.3)
```
- [ ] **Step 2: Run the failing tests**
Run: `pytest l4d2web/tests/test_rcon.py -v`
Expected: FAIL — `ModuleNotFoundError: No module named 'l4d2web.services.rcon'`.
- [ ] **Step 3: Implement `l4d2web/services/rcon.py`**
```python
"""Source RCON client + status parser.
Pure stdlib. One TCP connection per query — fine at our scale (loopback
~10-20ms round-trip; pooling not worth the complexity).
Source RCON wire format:
size : little-endian int32 (count of the bytes that follow)
req_id: little-endian int32
ptype : little-endian int32
body : utf-8 string, null-terminated
pad : one extra null byte
Packet types:
SERVERDATA_AUTH = 3 (client -> server)
SERVERDATA_EXECCOMMAND = 2 (client -> server)
SERVERDATA_AUTH_RESPONSE= 2 (server -> client)
SERVERDATA_RESPONSE_VALUE = 0 (server -> client)
After auth, the server sends a type=0 empty packet *first* and then the
type=2 auth response. req_id == -1 on the auth response = bad password.
"""
from __future__ import annotations
import re
import socket
import struct
from dataclasses import dataclass
from typing import Iterable
SERVERDATA_AUTH = 3
SERVERDATA_EXECCOMMAND = 2
SERVERDATA_AUTH_RESPONSE = 2
SERVERDATA_RESPONSE_VALUE = 0
_STEAM_ID_BASE = 76561197960265728
class RconError(Exception):
"""Network, timeout, or protocol error."""
class RconAuthError(RconError):
"""The server rejected the password."""
@dataclass(slots=True, frozen=True)
class PlayerRow:
steam_id_64: str
name: str
connected_seconds: int
ping: int
@dataclass(slots=True, frozen=True)
class StatusResponse:
map: str
players: int
max_players: int
bots: int
hibernating: bool
roster: list[PlayerRow]
def query_status(
host: str, port: int, password: str, *, timeout: float = 2.0
) -> StatusResponse:
"""Connect to the RCON port, authenticate, send `status`, return parsed result."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
try:
sock.connect((host, port))
except OSError as exc:
raise RconError(f"connect failed: {exc}") from exc
try:
_send_packet(sock, 1, SERVERDATA_AUTH, password)
# Drain the leading empty type-0 packet; then read the real auth response.
r1 = _recv_packet(sock)
r2 = _recv_packet(sock)
auth = r2 if r1[1] == SERVERDATA_RESPONSE_VALUE else r1
if auth[0] == -1:
raise RconAuthError("bad rcon password")
_send_packet(sock, 2, SERVERDATA_EXECCOMMAND, "status")
_, _, body = _recv_packet(sock)
except (OSError, socket.timeout) as exc:
raise RconError(f"rcon i/o error: {exc}") from exc
finally:
sock.close()
return parse_status(body)
def _send_packet(sock: socket.socket, req_id: int, ptype: int, body: str) -> None:
body_bytes = body.encode("utf-8") + b"\x00\x00"
size = 4 + 4 + len(body_bytes)
sock.sendall(struct.pack("<iii", size, req_id, ptype) + body_bytes)
def _recv_packet(sock: socket.socket) -> tuple[int, int, str]:
size = struct.unpack("<i", _recvall(sock, 4))[0]
payload = _recvall(sock, size)
req_id, ptype = struct.unpack("<ii", payload[:8])
body = payload[8:].rstrip(b"\x00").decode("utf-8", errors="replace")
return req_id, ptype, body
def _recvall(sock: socket.socket, n: int) -> bytes:
data = b""
while len(data) < n:
chunk = sock.recv(n - len(data))
if not chunk:
raise RconError("rcon connection closed")
data += chunk
return data
# --- Status parsing -------------------------------------------------------
_MAP_RE = re.compile(r"^map\s*:\s*(\S+)", re.MULTILINE)
_PLAYERS_RE = re.compile(
r"^players\s*:\s*(\d+)\s+humans,\s*(\d+)\s+bots\s*\((\d+)\s+max\)"
r"\s*\((not hibernating|hibernating)\)",
re.MULTILINE,
)
# A status player row: starts with `#`, then variable numeric prefixes,
# then a quoted name, then STEAM_X:Y:Z, then connected time, then ping.
_PLAYER_RE = re.compile(
r'^#\s+(?:\d+\s+)+"(?P<name>[^"]*)"\s+'
r"(?P<sid>STEAM_\d+:(?P<y>\d+):(?P<z>\d+))\s+"
r"(?P<connected>[\d:]+)\s+"
r"(?P<ping>\d+)\s+",
re.MULTILINE,
)
def parse_status(body: str) -> StatusResponse:
map_match = _MAP_RE.search(body)
if not map_match:
raise RconError(f"status: no map line in response\n{body!r}")
players_match = _PLAYERS_RE.search(body)
if not players_match:
raise RconError(f"status: no players line\n{body!r}")
roster: list[PlayerRow] = []
for m in _PLAYER_RE.finditer(body):
y = int(m.group("y"))
z = int(m.group("z"))
roster.append(
PlayerRow(
steam_id_64=str(_STEAM_ID_BASE + (z * 2) + y),
name=m.group("name"),
connected_seconds=_parse_duration(m.group("connected")),
ping=int(m.group("ping")),
)
)
return StatusResponse(
map=map_match.group(1),
players=int(players_match.group(1)),
bots=int(players_match.group(2)),
max_players=int(players_match.group(3)),
hibernating=(players_match.group(4) == "hibernating"),
roster=roster,
)
def _parse_duration(text: str) -> int:
"""Parse Source's connected duration: HH:MM:SS or MM:SS -> seconds."""
parts = [int(p) for p in text.split(":")]
if len(parts) == 2:
return parts[0] * 60 + parts[1]
if len(parts) == 3:
return parts[0] * 3600 + parts[1] * 60 + parts[2]
raise RconError(f"unparseable connected duration: {text!r}")
```
Note on Steam ID conversion: `STEAM_X:Y:Z` → SteamID64 = `76561197960265728 + Z*2 + Y`. The earlier inline math in this plan said `Y*2 + Z`; the correct formula is `Z*2 + Y` (Y is the lowest bit). The test expectation `76561197985018726` reflects the correct math for `STEAM_1:0:12376499`.
- [ ] **Step 4: Run the RCON tests, verify pass**
Run: `pytest l4d2web/tests/test_rcon.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add l4d2web/services/rcon.py l4d2web/tests/test_rcon.py
git commit -m "feat(rcon): add Source RCON client + status parser"
```
---
## Task 3: Append RCON password to spec yaml
**Files:**
- Modify: `l4d2web/services/l4d2_facade.py:28-52`
- Test: `l4d2web/tests/test_l4d2_facade.py`
- [ ] **Step 1: Write the failing facade tests**
Append to `l4d2web/tests/test_l4d2_facade.py`:
```python
from l4d2web.models import Blueprint, Server
from l4d2web.services.l4d2_facade import build_server_spec_payload
def _make_server_blueprint(rcon: str = "") -> tuple[Server, Blueprint]:
bp = Blueprint(
id=1, user_id=1, name="bp",
arguments='["-tickrate","60"]',
config='["sv_consistency 1","mp_gamemode coop"]',
)
srv = Server(
id=1, user_id=1, blueprint_id=1, name="s", port=27500,
rcon_password=rcon,
)
return srv, bp
def test_build_server_spec_payload_appends_rcon_password_last() -> None:
srv, bp = _make_server_blueprint(rcon="topsecret123")
overlays = [(7, "/overlays/7", True), (8, "/overlays/8", False)]
spec = build_server_spec_payload(srv, bp, overlays)
cfg = spec["config"]
# rcon_password line is the LAST entry — overlay exec lines + blueprint
# config + rcon_password.
assert cfg[-1] == 'rcon_password "topsecret123"'
# Lines before our injection still contain the blueprint config.
assert "sv_consistency 1" in cfg
assert "mp_gamemode coop" in cfg
def test_build_server_spec_payload_omits_rcon_password_when_empty() -> None:
srv, bp = _make_server_blueprint(rcon="")
spec = build_server_spec_payload(srv, bp, [])
for line in spec["config"]:
assert not line.startswith("rcon_password ")
```
- [ ] **Step 2: Run failing tests**
Run: `pytest l4d2web/tests/test_l4d2_facade.py -v -k rcon_password`
Expected: FAIL — assertion errors (line not appended).
- [ ] **Step 3: Modify `l4d2web/services/l4d2_facade.py:28-52`**
Replace the existing `build_server_spec_payload` body that ends with `return { ... "config": exec_lines + json.loads(blueprint.config) }` with:
```python
def build_server_spec_payload(
server: Server,
blueprint: Blueprint,
overlay_rows: list[tuple[int, str, bool]],
) -> dict:
overlays: list[dict] = []
for overlay_id, path, expose in overlay_rows:
if expose:
overlays.append({"path": path, "alias": f"overlay_{overlay_id}"})
else:
overlays.append({"path": path})
# Source `exec` is last-wins. First list entry = topmost overlay = highest
# precedence, so its exec runs LAST. Emit in reverse position order.
exec_lines = [
f"exec server_overlay_{overlay_id}"
for overlay_id, _, expose in reversed(overlay_rows)
if expose
]
config_lines: list[str] = exec_lines + json.loads(blueprint.config)
# rcon_password is appended LAST so neither overlays nor user blueprint
# config can override it (Source's cvar semantics are last-wins).
if server.rcon_password:
config_lines.append(f'rcon_password "{server.rcon_password}"')
return {
"port": server.port,
"overlays": overlays,
"arguments": json.loads(blueprint.arguments),
"config": config_lines,
}
```
- [ ] **Step 4: Run, verify pass**
Run: `pytest l4d2web/tests/test_l4d2_facade.py -v`
Expected: PASS (all existing + new).
- [ ] **Step 5: Commit**
```bash
git add l4d2web/services/l4d2_facade.py l4d2web/tests/test_l4d2_facade.py
git commit -m "feat(facade): append rcon_password as final server.cfg line"
```
---
## Task 4: Generate `rcon_password` on server create
**Files:**
- Modify: `l4d2web/routes/server_routes.py:58-83`
- Test: `l4d2web/tests/test_server_routes.py` (extend)
- [ ] **Step 1: Write the failing test**
Append to `l4d2web/tests/test_server_routes.py`:
```python
def test_create_server_generates_rcon_password(user_client) -> None:
res = user_client.post(
"/servers",
data={"name": "fresh", "blueprint_id": "1"},
headers={"X-CSRF-Token": "test-token"},
)
assert res.status_code in (200, 201, 302)
with session_scope() as db:
row = db.scalar(select(Server).where(Server.name == "fresh"))
assert row is not None
assert len(row.rcon_password) >= 32
```
If the `user_client` fixture and a default blueprint seed already exist in the test suite, reuse them. If not, mirror the existing `create_server` test you find next to this one.
- [ ] **Step 2: Run, verify it fails**
Run: `pytest l4d2web/tests/test_server_routes.py -v -k rcon_password`
Expected: FAIL — `rcon_password` is empty.
- [ ] **Step 3: Modify `create_server` in `l4d2web/routes/server_routes.py:58-83`**
At the top of the file, add to the imports:
```python
import secrets
```
In `create_server`, replace the `Server(...)` construction (around line 87) with:
```python
server = Server(
user_id=user.id,
blueprint_id=blueprint.id,
name=name,
port=port,
desired_state="stopped",
actual_state="unknown",
last_error="",
rcon_password=secrets.token_urlsafe(32),
)
```
- [ ] **Step 4: Run, verify pass**
Run: `pytest l4d2web/tests/test_server_routes.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add l4d2web/routes/server_routes.py l4d2web/tests/test_server_routes.py
git commit -m "feat(servers): generate rcon_password on server create"
```
---
## Task 5: Steam Web API client
**Files:**
- Create: `l4d2web/services/steam_users.py`
- Test: `l4d2web/tests/test_steam_users.py`
- [ ] **Step 1: Write the failing tests**
Create `l4d2web/tests/test_steam_users.py`:
```python
from __future__ import annotations
from typing import Any
import pytest
from l4d2web.services import steam_users
class _FakeResponse:
def __init__(self, json_body: dict[str, Any], status: int = 200) -> None:
self._body = json_body
self.status_code = status
def raise_for_status(self) -> None:
if self.status_code >= 400:
raise RuntimeError(f"http {self.status_code}")
def json(self) -> dict[str, Any]:
return self._body
def _patched_get(monkeypatch: pytest.MonkeyPatch, body: dict, capture: list) -> None:
def fake_get(url: str, params: dict, timeout: float = 30.0) -> _FakeResponse:
capture.append({"url": url, "params": params, "timeout": timeout})
return _FakeResponse(body)
monkeypatch.setattr(steam_users, "_session_get", fake_get)
def test_fetch_profiles_batch_builds_correct_request(monkeypatch: pytest.MonkeyPatch) -> None:
captured: list = []
body = {"response": {"players": [
{"steamid": "76561197960828710", "personaname": "Alice",
"avatarmedium": "https://avatars.../alice_medium.jpg"},
]}}
_patched_get(monkeypatch, body, captured)
profiles = steam_users.fetch_profiles_batch(
["76561197960828710", "76561198021234567"], api_key="KEY"
)
assert captured[0]["url"].endswith("/GetPlayerSummaries/v0002/")
assert captured[0]["params"]["key"] == "KEY"
assert captured[0]["params"]["steamids"] == "76561197960828710,76561198021234567"
assert len(profiles) == 1
p = profiles[0]
assert p.steam_id_64 == "76561197960828710"
assert p.persona_name == "Alice"
assert p.avatar_url.endswith("alice_medium.jpg")
def test_fetch_profiles_batch_skips_private_or_missing(monkeypatch: pytest.MonkeyPatch) -> None:
# The Steam API simply omits non-resolvable IDs from the response. Caller
# should accept that and return only what's there.
body = {"response": {"players": [
{"steamid": "76561197960828710", "personaname": "Alice",
"avatarmedium": "https://avatars.../alice_medium.jpg"},
]}}
_patched_get(monkeypatch, body, [])
profiles = steam_users.fetch_profiles_batch(
["76561197960828710", "76561197999999999"], api_key="KEY"
)
assert len(profiles) == 1
assert profiles[0].steam_id_64 == "76561197960828710"
def test_fetch_profiles_batch_chunks_by_100(monkeypatch: pytest.MonkeyPatch) -> None:
ids = [str(76561197960000000 + i) for i in range(150)]
calls: list = []
body = {"response": {"players": []}}
_patched_get(monkeypatch, body, calls)
steam_users.fetch_profiles_batch(ids, api_key="KEY")
assert len(calls) == 2
assert calls[0]["params"]["steamids"].count(",") == 99 # 100 ids -> 99 commas
assert calls[1]["params"]["steamids"].count(",") == 49 # 50 ids
```
- [ ] **Step 2: Run, verify failures**
Run: `pytest l4d2web/tests/test_steam_users.py -v`
Expected: FAIL — module missing.
- [ ] **Step 3: Implement `l4d2web/services/steam_users.py`**
```python
"""Steam Web API client for player profile lookups.
Mirrors the shape of l4d2web/services/steam_workshop.py:17-43:
- single thread-local requests.Session
- 30s timeout
- HTTPS only
Difference: GetPlayerSummaries requires an API key in the querystring,
unlike the anonymous workshop endpoints.
"""
from __future__ import annotations
import threading
from dataclasses import dataclass
from typing import Iterable
import requests
GET_PLAYER_SUMMARIES_URL = (
"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/"
)
REQUEST_TIMEOUT_SECONDS = 30.0
MAX_IDS_PER_CALL = 100
_session_local = threading.local()
def _session() -> requests.Session:
sess = getattr(_session_local, "session", None)
if sess is None:
sess = requests.Session()
_session_local.session = sess
return sess
def _session_get(url: str, params: dict, timeout: float = REQUEST_TIMEOUT_SECONDS):
"""Indirection seam so tests can monkeypatch a fake here."""
return _session().get(url, params=params, timeout=timeout)
@dataclass(slots=True, frozen=True)
class SteamProfile:
steam_id_64: str
persona_name: str
avatar_url: str
def fetch_profiles_batch(
steam_ids: Iterable[str], *, api_key: str
) -> list[SteamProfile]:
"""Resolve a batch of SteamID64 strings to persona name + avatar URL.
Steam's API caps each call at 100 IDs; this helper chunks transparently.
IDs that Steam can't resolve (private, deleted) are simply absent from
the response and from the returned list.
"""
ids = list(steam_ids)
out: list[SteamProfile] = []
for i in range(0, len(ids), MAX_IDS_PER_CALL):
chunk = ids[i : i + MAX_IDS_PER_CALL]
params = {"key": api_key, "steamids": ",".join(chunk)}
resp = _session_get(GET_PLAYER_SUMMARIES_URL, params=params)
resp.raise_for_status()
payload = resp.json() or {}
players = (payload.get("response") or {}).get("players") or []
for p in players:
out.append(
SteamProfile(
steam_id_64=str(p["steamid"]),
persona_name=str(p.get("personaname", "")),
avatar_url=str(p.get("avatarmedium", "")),
)
)
return out
```
- [ ] **Step 4: Run, verify pass**
Run: `pytest l4d2web/tests/test_steam_users.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add l4d2web/services/steam_users.py l4d2web/tests/test_steam_users.py
git commit -m "feat(steam): add GetPlayerSummaries client"
```
---
## Task 6: Live-state poller — RLE snapshot writer
**Files:**
- Create: `l4d2web/services/live_state_poller.py`
- Test: `l4d2web/tests/test_live_state_poller.py`
- [ ] **Step 1: Write the failing RLE snapshot test**
Create `l4d2web/tests/test_live_state_poller.py`:
```python
"""Live-state poller tests.
Each test seeds an app + DB, monkeypatches the RCON client to return a
canned StatusResponse, and asserts on what the poller writes.
"""
from __future__ import annotations
from datetime import datetime, timedelta, UTC
import pytest
from sqlalchemy import select
from l4d2web.app import create_app
from l4d2web.db import init_db, session_scope
from l4d2web.models import (
Blueprint,
Server,
ServerLiveState,
User,
)
from l4d2web.services import live_state_poller
from l4d2web.services.rcon import PlayerRow, StatusResponse
def _seed(tmp_path):
db_url = f"sqlite:///{tmp_path/'p.db'}"
import os
os.environ["DATABASE_URL"] = db_url
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "x"})
init_db()
with session_scope() as db:
u = User(username="u", password_digest="x"); db.add(u); db.flush()
bp = Blueprint(user_id=u.id, name="bp", arguments="[]", config="[]"); db.add(bp); db.flush()
s = Server(
user_id=u.id, blueprint_id=bp.id, name="s", port=27500,
rcon_password="pw", actual_state="running",
)
db.add(s); db.flush()
return app, s.id
def _status(players: int, map_: str = "c1m1_hotel", hibernating: bool = False,
roster: list[PlayerRow] | None = None) -> StatusResponse:
return StatusResponse(
map=map_, players=players, max_players=4, bots=0,
hibernating=hibernating, roster=roster or [],
)
def test_rle_bumps_last_seen_when_state_unchanged(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
monkeypatch.setattr(
live_state_poller, "query_status",
lambda host, port, password, timeout: _status(players=0),
)
with app.app_context():
live_state_poller.poll_once()
live_state_poller.poll_once()
with session_scope() as db:
rows = db.scalars(
select(ServerLiveState).where(ServerLiveState.server_id == sid)
.order_by(ServerLiveState.started_at)
).all()
assert len(rows) == 1
assert rows[0].players == 0
assert rows[0].last_seen_at >= rows[0].started_at
def test_rle_inserts_new_row_on_state_change(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
snapshots = iter([_status(players=0), _status(players=1)])
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: next(snapshots),
)
with app.app_context():
live_state_poller.poll_once()
live_state_poller.poll_once()
with session_scope() as db:
rows = db.scalars(
select(ServerLiveState).where(ServerLiveState.server_id == sid)
.order_by(ServerLiveState.started_at)
).all()
assert [r.players for r in rows] == [0, 1]
def test_skips_servers_without_rcon_password(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
with session_scope() as db:
s = db.scalar(select(Server).where(Server.id == sid))
s.rcon_password = ""
called: list = []
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: called.append(1) or _status(0),
)
with app.app_context():
live_state_poller.poll_once()
assert called == []
def test_skips_non_running_servers(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
with session_scope() as db:
s = db.scalar(select(Server).where(Server.id == sid))
s.actual_state = "stopped"
called: list = []
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: called.append(1) or _status(0),
)
with app.app_context():
live_state_poller.poll_once()
assert called == []
```
- [ ] **Step 2: Run, verify failures**
Run: `pytest l4d2web/tests/test_live_state_poller.py -v -k rle or skip`
Expected: FAIL — module missing.
- [ ] **Step 3: Implement the poller (snapshot-only for now)**
Create `l4d2web/services/live_state_poller.py`:
```python
"""Background poller that maintains live game-server state in the DB.
Modeled on l4d2web/services/job_worker.py:617-647. This module owns:
- per-server snapshot writes with run-length encoding into
`server_live_state`
- player-session lifecycle in `server_player_session` (Task 7)
- Steam profile enrichment into `steam_user_profile` (Task 8)
- retention pruning and stuck-session closure (Task 10)
This file is built up across Tasks 6-10.
"""
from __future__ import annotations
import logging
import threading
import time
from datetime import datetime, UTC
from typing import Callable
from sqlalchemy import select
from l4d2web.db import session_scope
from l4d2web.models import (
Server,
ServerLiveState,
)
from l4d2web.services.rcon import RconError, StatusResponse, query_status
logger = logging.getLogger(__name__)
def _now() -> datetime:
return datetime.now(UTC).replace(tzinfo=None)
def poll_once() -> None:
"""One pass over all running servers with a configured rcon_password."""
with session_scope() as db:
servers = db.scalars(
select(Server)
.where(Server.actual_state == "running")
.where(Server.rcon_password != "")
).all()
targets = [(s.id, s.port, s.rcon_password) for s in servers]
for server_id, port, password in targets:
try:
status = query_status("127.0.0.1", port, password, timeout=2.0)
except RconError:
logger.warning("rcon query failed for server %d", server_id, exc_info=True)
continue
_record_snapshot(server_id, status)
def _record_snapshot(server_id: int, status: StatusResponse) -> None:
"""RLE write: bump last_seen_at if state matches, else insert a new row."""
now = _now()
with session_scope() as db:
latest = db.scalars(
select(ServerLiveState)
.where(ServerLiveState.server_id == server_id)
.order_by(ServerLiveState.started_at.desc())
.limit(1)
).first()
if latest is not None and _matches(latest, status):
latest.last_seen_at = now
return
db.add(
ServerLiveState(
server_id=server_id,
started_at=now,
last_seen_at=now,
players=status.players,
max_players=status.max_players,
bots=status.bots,
map=status.map,
hibernating=status.hibernating,
)
)
def _matches(row: ServerLiveState, status: StatusResponse) -> bool:
return (
row.players == status.players
and row.max_players == status.max_players
and row.bots == status.bots
and row.map == status.map
and row.hibernating == status.hibernating
)
```
- [ ] **Step 4: Run, verify pass**
Run: `pytest l4d2web/tests/test_live_state_poller.py -v -k "rle or skip"`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add l4d2web/services/live_state_poller.py l4d2web/tests/test_live_state_poller.py
git commit -m "feat(live-state): poller writes RLE snapshots to server_live_state"
```
---
## Task 7: Live-state poller — session reconciliation
**Files:**
- Modify: `l4d2web/services/live_state_poller.py`
- Modify: `l4d2web/tests/test_live_state_poller.py`
- [ ] **Step 1: Write the failing session tests**
Append to `l4d2web/tests/test_live_state_poller.py`:
```python
from l4d2web.models import ServerPlayerSession
def _player(steam_id: str = "76561197960828710", name: str = "Alice",
connected: int = 21, ping: int = 60) -> PlayerRow:
return PlayerRow(
steam_id_64=steam_id, name=name, connected_seconds=connected, ping=ping,
)
def test_new_player_opens_session_with_backfilled_join(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: _status(players=1, roster=[_player(connected=30)]),
)
with app.app_context():
live_state_poller.poll_once()
with session_scope() as db:
sessions = db.scalars(
select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid)
).all()
assert len(sessions) == 1
s = sessions[0]
assert s.steam_id_64 == "76561197960828710"
assert s.name_at_join == "Alice"
assert s.left_at is None
assert s.min_ping == 60
assert s.max_ping == 60
# joined_at should be ~30s before now
delta = (datetime.now(UTC).replace(tzinfo=None) - s.joined_at).total_seconds()
assert 25 <= delta <= 60
def test_session_closes_when_player_no_longer_in_roster(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
snapshots = iter([
_status(players=1, roster=[_player()]),
_status(players=0, roster=[]),
])
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: next(snapshots),
)
with app.app_context():
live_state_poller.poll_once()
live_state_poller.poll_once()
with session_scope() as db:
sessions = db.scalars(
select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid)
).all()
assert len(sessions) == 1
assert sessions[0].left_at is not None
def test_session_ping_range_extends(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
snapshots = iter([
_status(players=1, roster=[_player(ping=60)]),
_status(players=1, roster=[_player(ping=200)]),
_status(players=1, roster=[_player(ping=40)]),
])
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: next(snapshots),
)
with app.app_context():
live_state_poller.poll_once()
live_state_poller.poll_once()
live_state_poller.poll_once()
with session_scope() as db:
s = db.scalar(select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid))
assert s.min_ping == 40
assert s.max_ping == 200
def test_session_skips_bots(tmp_path, monkeypatch) -> None:
# Bots have non-STEAM uniqueid; our parser already drops them, but verify
# that even if a non-STEAM steam_id_64 makes it through, sessions filter
# it out. We exercise the filter directly by giving a string that doesn't
# match the expected SteamID64 numeric form.
app, sid = _seed(tmp_path)
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: _status(
players=0, roster=[_player(steam_id="BOT", name="Coach")]
),
)
with app.app_context():
live_state_poller.poll_once()
with session_scope() as db:
sessions = db.scalars(
select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid)
).all()
assert sessions == []
```
- [ ] **Step 2: Run, verify failures**
Run: `pytest l4d2web/tests/test_live_state_poller.py -v -k session`
Expected: FAIL.
- [ ] **Step 3: Extend `live_state_poller.py` with session reconciliation**
Update the imports at the top:
```python
from datetime import datetime, timedelta, UTC
from sqlalchemy import and_, select
from l4d2web.models import (
Server,
ServerLiveState,
ServerPlayerSession,
)
from l4d2web.services.rcon import PlayerRow, RconError, StatusResponse, query_status
```
Replace the `poll_once` body with:
```python
def poll_once() -> None:
"""One pass over all running servers with a configured rcon_password."""
with session_scope() as db:
servers = db.scalars(
select(Server)
.where(Server.actual_state == "running")
.where(Server.rcon_password != "")
).all()
targets = [(s.id, s.port, s.rcon_password) for s in servers]
for server_id, port, password in targets:
try:
status = query_status("127.0.0.1", port, password, timeout=2.0)
except RconError:
logger.warning("rcon query failed for server %d", server_id, exc_info=True)
continue
_record_snapshot(server_id, status)
_reconcile_sessions(server_id, status)
```
Add the session reconciler:
```python
_STEAM_ID_64_PREFIX = "7656" # all SteamID64s start with this; bots/anon do not
def _is_valid_steam_id_64(value: str) -> bool:
return value.startswith(_STEAM_ID_64_PREFIX) and value.isdigit() and len(value) >= 16
def _reconcile_sessions(server_id: int, status: StatusResponse) -> None:
"""Open new sessions, update ping ranges, close departed sessions."""
now = _now()
roster = [p for p in status.roster if _is_valid_steam_id_64(p.steam_id_64)]
seen_ids = {p.steam_id_64 for p in roster}
with session_scope() as db:
open_rows = db.scalars(
select(ServerPlayerSession).where(
ServerPlayerSession.server_id == server_id,
ServerPlayerSession.left_at.is_(None),
)
).all()
open_by_sid = {r.steam_id_64: r for r in open_rows}
# Close sessions for players no longer in the roster.
for sid, row in open_by_sid.items():
if sid not in seen_ids:
row.left_at = now
# Open / update sessions for current roster.
for p in roster:
existing = open_by_sid.get(p.steam_id_64)
if existing is None:
db.add(
ServerPlayerSession(
server_id=server_id,
steam_id_64=p.steam_id_64,
joined_at=now - timedelta(seconds=p.connected_seconds),
left_at=None,
name_at_join=p.name,
min_ping=p.ping,
max_ping=p.ping,
)
)
else:
if p.ping < existing.min_ping:
existing.min_ping = p.ping
if p.ping > existing.max_ping:
existing.max_ping = p.ping
```
- [ ] **Step 4: Run, verify pass**
Run: `pytest l4d2web/tests/test_live_state_poller.py -v`
Expected: PASS (all earlier tests + new session tests).
- [ ] **Step 5: Commit**
```bash
git add l4d2web/services/live_state_poller.py l4d2web/tests/test_live_state_poller.py
git commit -m "feat(live-state): reconcile player sessions on each poll"
```
---
## Task 8: Live-state poller — Steam profile enrichment
**Files:**
- Modify: `l4d2web/services/live_state_poller.py`
- Modify: `l4d2web/tests/test_live_state_poller.py`
- [ ] **Step 1: Write the failing enrichment tests**
Append to `l4d2web/tests/test_live_state_poller.py`:
```python
from l4d2web.models import SteamUserProfile
from l4d2web.services import steam_users
def test_enriches_missing_profile(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
app.config["STEAM_WEB_API_KEY"] = "KEY"
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: _status(players=1, roster=[_player()]),
)
captured: list = []
def fake_fetch(ids, *, api_key):
captured.append((list(ids), api_key))
return [steam_users.SteamProfile(
steam_id_64="76561197960828710",
persona_name="Alice",
avatar_url="https://avatars.../alice_medium.jpg",
)]
monkeypatch.setattr(live_state_poller, "fetch_profiles_batch", fake_fetch)
with app.app_context():
live_state_poller.poll_once()
assert captured and captured[0][1] == "KEY"
with session_scope() as db:
p = db.scalar(select(SteamUserProfile))
assert p is not None
assert p.persona_name == "Alice"
def test_skips_enrichment_when_api_key_unset(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
app.config["STEAM_WEB_API_KEY"] = ""
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: _status(players=1, roster=[_player()]),
)
monkeypatch.setattr(
live_state_poller, "fetch_profiles_batch",
lambda ids, *, api_key: pytest.fail("must not call without key"),
)
with app.app_context():
live_state_poller.poll_once()
def test_skips_enrichment_when_cache_is_fresh(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
app.config["STEAM_WEB_API_KEY"] = "KEY"
with session_scope() as db:
db.add(SteamUserProfile(
steam_id_64="76561197960828710",
persona_name="cached",
avatar_url="cached.jpg",
fetched_at=datetime.now(UTC).replace(tzinfo=None),
))
monkeypatch.setattr(
live_state_poller, "query_status",
lambda *a, **kw: _status(players=1, roster=[_player()]),
)
called: list = []
monkeypatch.setattr(
live_state_poller, "fetch_profiles_batch",
lambda ids, *, api_key: called.append(list(ids)) or [],
)
with app.app_context():
live_state_poller.poll_once()
assert called == []
```
- [ ] **Step 2: Run, verify failures**
Run: `pytest l4d2web/tests/test_live_state_poller.py -v -k enrich or api_key or cache_is_fresh`
Expected: FAIL.
- [ ] **Step 3: Extend `live_state_poller.py` with enrichment**
Update imports:
```python
from flask import current_app
from l4d2web.models import (
Server,
ServerLiveState,
ServerPlayerSession,
SteamUserProfile,
)
from l4d2web.services.steam_users import SteamProfile, fetch_profiles_batch
```
Replace `poll_once` (now adding an enrichment phase at the end of each per-server iteration):
```python
def poll_once() -> None:
with session_scope() as db:
servers = db.scalars(
select(Server)
.where(Server.actual_state == "running")
.where(Server.rcon_password != "")
).all()
targets = [(s.id, s.port, s.rcon_password) for s in servers]
api_key = current_app.config.get("STEAM_WEB_API_KEY", "") or ""
ttl_seconds = int(current_app.config.get("STEAM_PROFILE_TTL_SECONDS", 86400))
for server_id, port, password in targets:
try:
status = query_status("127.0.0.1", port, password, timeout=2.0)
except RconError:
logger.warning("rcon query failed for server %d", server_id, exc_info=True)
continue
_record_snapshot(server_id, status)
_reconcile_sessions(server_id, status)
if api_key:
_enrich_profiles(status, api_key=api_key, ttl_seconds=ttl_seconds)
def _enrich_profiles(status: StatusResponse, *, api_key: str, ttl_seconds: int) -> None:
"""Fetch+cache Steam profile data for any roster IDs missing or stale."""
roster_ids = {p.steam_id_64 for p in status.roster if _is_valid_steam_id_64(p.steam_id_64)}
if not roster_ids:
return
cutoff = _now() - timedelta(seconds=ttl_seconds)
with session_scope() as db:
fresh = set(db.scalars(
select(SteamUserProfile.steam_id_64).where(
SteamUserProfile.steam_id_64.in_(roster_ids),
SteamUserProfile.fetched_at >= cutoff,
)
).all())
needs_fetch = sorted(roster_ids - fresh)
if not needs_fetch:
return
try:
profiles = fetch_profiles_batch(needs_fetch, api_key=api_key)
except Exception: # network / API errors are soft-fail
logger.warning("steam profile enrichment failed", exc_info=True)
return
now = _now()
with session_scope() as db:
for p in profiles:
row = db.get(SteamUserProfile, p.steam_id_64)
if row is None:
db.add(SteamUserProfile(
steam_id_64=p.steam_id_64,
persona_name=p.persona_name,
avatar_url=p.avatar_url,
fetched_at=now,
))
else:
row.persona_name = p.persona_name
row.avatar_url = p.avatar_url
row.fetched_at = now
```
- [ ] **Step 4: Run, verify pass**
Run: `pytest l4d2web/tests/test_live_state_poller.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add l4d2web/services/live_state_poller.py l4d2web/tests/test_live_state_poller.py
git commit -m "feat(live-state): enrich roster with cached Steam profiles"
```
---
## Task 9: Live-state poller — retention + stuck-session close + thread startup
**Files:**
- Modify: `l4d2web/services/live_state_poller.py`
- Modify: `l4d2web/app.py`
- Modify: `l4d2web/config.py`
- Modify: `l4d2web/tests/test_live_state_poller.py`
- [ ] **Step 1: Write the failing tests**
Append to `l4d2web/tests/test_live_state_poller.py`:
```python
def test_retention_trims_old_rows(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
app.config["LIVE_STATE_HISTORY_DAYS"] = 30
long_ago = datetime.now(UTC).replace(tzinfo=None) - timedelta(days=45)
with session_scope() as db:
db.add(ServerLiveState(
server_id=sid, started_at=long_ago, last_seen_at=long_ago,
players=0, max_players=4, bots=0, map="old", hibernating=True,
))
db.add(ServerPlayerSession(
server_id=sid, steam_id_64="76561197960828710",
joined_at=long_ago, left_at=long_ago,
name_at_join="OldPlayer", min_ping=10, max_ping=10,
))
with app.app_context():
live_state_poller.prune_history()
with session_scope() as db:
snaps = db.scalars(select(ServerLiveState)).all()
sess = db.scalars(select(ServerPlayerSession)).all()
assert snaps == []
assert sess == []
def test_close_stuck_sessions_after_threshold(tmp_path, monkeypatch) -> None:
app, sid = _seed(tmp_path)
app.config["STUCK_SESSION_SECONDS"] = 60
way_back = datetime.now(UTC).replace(tzinfo=None) - timedelta(hours=2)
with session_scope() as db:
db.add(ServerPlayerSession(
server_id=sid, steam_id_64="76561197960828710",
joined_at=way_back, left_at=None,
name_at_join="GhostPlayer", min_ping=10, max_ping=10,
))
# Server is in `targets` but the RCON call fails — i.e., we haven't seen
# this server respond in a long time. Poller must close stuck sessions.
def boom(*a, **kw):
from l4d2web.services.rcon import RconError
raise RconError("simulated outage")
monkeypatch.setattr(live_state_poller, "query_status", boom)
with app.app_context():
live_state_poller.poll_once()
with session_scope() as db:
row = db.scalar(select(ServerPlayerSession))
assert row.left_at is not None
def test_start_live_state_poller_skipped_during_testing(monkeypatch, tmp_path) -> None:
from l4d2web import app as app_module
called: list = []
monkeypatch.setattr(
app_module, "start_live_state_poller", lambda app: called.append(app)
)
app_module.create_app({"TESTING": True, "DATABASE_URL": f"sqlite:///{tmp_path/'x.db'}", "SECRET_KEY": "k"})
assert called == []
def test_start_live_state_poller_started_outside_testing(monkeypatch, tmp_path) -> None:
from l4d2web import app as app_module
called: list = []
monkeypatch.setattr(
app_module, "start_live_state_poller", lambda app: called.append(app)
)
app = app_module.create_app(
{"TESTING": False, "DATABASE_URL": f"sqlite:///{tmp_path/'x.db'}", "SECRET_KEY": "k"}
)
assert called == [app]
```
- [ ] **Step 2: Run, verify failures**
Run: `pytest l4d2web/tests/test_live_state_poller.py -v -k retention or stuck or start_live_state_poller`
Expected: FAIL.
- [ ] **Step 3: Extend `live_state_poller.py`**
Add to the module (after `_enrich_profiles`):
```python
def prune_history() -> None:
"""Delete snapshots and closed sessions older than retention."""
days = int(current_app.config.get("LIVE_STATE_HISTORY_DAYS", 30))
cutoff = _now() - timedelta(days=days)
with session_scope() as db:
db.query(ServerLiveState).filter(
ServerLiveState.last_seen_at < cutoff
).delete(synchronize_session=False)
db.query(ServerPlayerSession).filter(
ServerPlayerSession.left_at.is_not(None),
ServerPlayerSession.left_at < cutoff,
).delete(synchronize_session=False)
def _close_stuck_sessions(server_id: int) -> None:
"""If a server has open sessions whose joined_at is older than threshold
AND we've failed to observe them, close them as stuck."""
seconds = int(current_app.config.get("STUCK_SESSION_SECONDS", 60))
cutoff = _now() - timedelta(seconds=seconds)
with session_scope() as db:
rows = db.scalars(
select(ServerPlayerSession).where(
ServerPlayerSession.server_id == server_id,
ServerPlayerSession.left_at.is_(None),
ServerPlayerSession.joined_at < cutoff,
)
).all()
for row in rows:
row.left_at = _now()
```
Update `poll_once` to call `_close_stuck_sessions` when RCON fails:
```python
def poll_once() -> None:
with session_scope() as db:
servers = db.scalars(
select(Server)
.where(Server.actual_state == "running")
.where(Server.rcon_password != "")
).all()
targets = [(s.id, s.port, s.rcon_password) for s in servers]
api_key = current_app.config.get("STEAM_WEB_API_KEY", "") or ""
ttl_seconds = int(current_app.config.get("STEAM_PROFILE_TTL_SECONDS", 86400))
timeout = float(current_app.config.get("LIVE_STATE_QUERY_TIMEOUT_SECONDS", 2.0))
for server_id, port, password in targets:
try:
status = query_status("127.0.0.1", port, password, timeout=timeout)
except RconError:
logger.warning("rcon query failed for server %d", server_id, exc_info=True)
_close_stuck_sessions(server_id)
continue
_record_snapshot(server_id, status)
_reconcile_sessions(server_id, status)
if api_key:
_enrich_profiles(status, api_key=api_key, ttl_seconds=ttl_seconds)
```
Add thread-start helpers at the end:
```python
_poller_started_lock = threading.Lock()
_poller_started = False
def start_live_state_poller(app) -> None:
"""Spawn the daemon poller thread once per process."""
global _poller_started
with _poller_started_lock:
if _poller_started:
return
_poller_started = True
interval = float(app.config.get("LIVE_STATE_POLL_SECONDS", 5))
retention_every = max(1, int(app.config.get("LIVE_STATE_RETENTION_EVERY_TICKS", 60)))
thread = threading.Thread(
target=_poller_loop,
args=(app, interval, retention_every),
name="left4me-live-state-poller",
daemon=True,
)
thread.start()
def _poller_loop(app, interval: float, retention_every: int) -> None:
tick = 0
while True:
try:
with app.app_context():
poll_once()
if tick % retention_every == 0:
prune_history()
except Exception:
logger.exception("live-state poller loop exception")
tick += 1
time.sleep(interval)
```
- [ ] **Step 4: Register config keys in `l4d2web/config.py`**
Find the existing config defaults block (mirroring `JOB_WORKER_POLL_SECONDS`) and add:
```python
"LIVE_STATE_POLL_SECONDS": 5,
"LIVE_STATE_QUERY_TIMEOUT_SECONDS": 2.0,
"LIVE_STATE_STALE_SECONDS": 30,
"LIVE_STATE_HISTORY_DAYS": 30,
"LIVE_STATE_RETENTION_EVERY_TICKS": 60,
"STUCK_SESSION_SECONDS": 60,
"STEAM_PROFILE_TTL_SECONDS": 86400,
"STEAM_WEB_API_KEY": "",
```
The env-var loading should follow the existing pattern; if `STEAM_WEB_API_KEY` is sourced from `web.env`, ensure the existing env-loader picks it up (it likely already does via a generic mechanism — verify before duplicating logic).
- [ ] **Step 5: Wire into `l4d2web/app.py`**
Find the line that calls `start_state_poller(app)` (or `start_job_workers(app)`). Immediately below it, add:
```python
from l4d2web.services.live_state_poller import start_live_state_poller
# ...
if not app.config.get("TESTING"):
start_live_state_poller(app)
```
If `start_state_poller` already runs only outside TESTING, hook in the same way.
- [ ] **Step 6: Run, verify pass**
Run: `pytest l4d2web/tests/test_live_state_poller.py -v`
Expected: PASS.
Run: `pytest l4d2web/tests -q`
Expected: PASS (no regressions).
- [ ] **Step 7: Commit**
```bash
git add l4d2web/services/live_state_poller.py l4d2web/app.py l4d2web/config.py \
l4d2web/tests/test_live_state_poller.py
git commit -m "feat(live-state): start daemon poller, prune history, close stuck sessions"
```
---
## Task 10: Server-list live-state badge
**Files:**
- Modify: `l4d2web/routes/page_routes.py` (or wherever `/servers` index renders — confirm)
- Modify: `l4d2web/templates/servers.html`
- Test: `l4d2web/tests/test_server_routes.py` (or `test_page_routes.py`)
- [ ] **Step 1: Locate the servers index handler**
Run: `grep -n "servers\.html\|/servers\b" l4d2web/routes/*.py`
Note the handler function and which route renders `servers.html`.
- [ ] **Step 2: Write the failing test**
Append to the appropriate test file:
```python
def test_servers_index_renders_live_state_badge(user_client) -> None:
# Seed one server with a recent snapshot, one without.
now = datetime.now(UTC).replace(tzinfo=None)
with session_scope() as db:
u = db.scalar(select(User).where(User.username == "user"))
bp = db.scalar(select(Blueprint).where(Blueprint.user_id == u.id))
s_active = Server(user_id=u.id, blueprint_id=bp.id, name="active", port=27700,
rcon_password="x", actual_state="running")
s_stale = Server(user_id=u.id, blueprint_id=bp.id, name="stale", port=27701,
rcon_password="x", actual_state="running")
db.add_all([s_active, s_stale]); db.flush()
db.add(ServerLiveState(
server_id=s_active.id, started_at=now, last_seen_at=now,
players=2, max_players=4, bots=0, map="c1m2_streets", hibernating=False,
))
old = now - timedelta(minutes=5)
db.add(ServerLiveState(
server_id=s_stale.id, started_at=old, last_seen_at=old,
players=0, max_players=4, bots=0, map="c1m1_hotel", hibernating=True,
))
res = user_client.get("/servers")
html = res.get_data(as_text=True)
assert "2/4" in html
assert "c1m2_streets" in html
# stale row should not render fresh counts; just a dim placeholder
assert "c1m1_hotel" not in html or "stale" in html.lower()
```
- [ ] **Step 3: Run, verify failure**
Run: `pytest l4d2web/tests/test_server_routes.py -v -k live_state_badge`
Expected: FAIL.
- [ ] **Step 4: Update the servers index handler**
In the route handler (found in Step 1), after loading the user's servers, also load the latest snapshot per server in one query, and pass a `live_state_by_server` dict into the template. Example sketch (adapt to the existing handler shape):
```python
from datetime import datetime, timedelta, UTC
from sqlalchemy import select, func
from l4d2web.models import ServerLiveState
# ... inside the handler, after loading `servers`:
stale_seconds = current_app.config.get("LIVE_STATE_STALE_SECONDS", 30)
cutoff = datetime.now(UTC).replace(tzinfo=None) - timedelta(seconds=stale_seconds)
server_ids = [s.id for s in servers]
latest_rows: dict[int, ServerLiveState] = {}
if server_ids:
# SQLite-friendly latest-per-server pattern.
subq = (
select(
ServerLiveState.server_id,
func.max(ServerLiveState.started_at).label("mx"),
)
.where(ServerLiveState.server_id.in_(server_ids))
.group_by(ServerLiveState.server_id)
.subquery()
)
rows = db.scalars(
select(ServerLiveState).join(
subq,
(ServerLiveState.server_id == subq.c.server_id)
& (ServerLiveState.started_at == subq.c.mx),
)
).all()
for r in rows:
latest_rows[r.server_id] = r
live_state_by_server = {}
for sid, row in latest_rows.items():
fresh = row.last_seen_at >= cutoff
live_state_by_server[sid] = {
"fresh": fresh,
"players": row.players,
"max_players": row.max_players,
"map": row.map,
"hibernating": row.hibernating,
}
```
Pass `live_state_by_server=live_state_by_server` to `render_template("servers.html", ...)`.
- [ ] **Step 5: Update `l4d2web/templates/servers.html`**
Within the row markup for each server, add an inline live-state cell:
```jinja
{% set ls = live_state_by_server.get(server.id) %}
<td class="server-live">
{% if server.actual_state != 'running' %}
<span class="muted"></span>
{% elif ls is none or not ls.fresh %}
<span class="muted" title="no recent data">?</span>
{% elif ls.hibernating %}
{{ ls.players }}/{{ ls.max_players }} · idle · {{ ls.map }}
{% else %}
{{ ls.players }}/{{ ls.max_players }} · {{ ls.map }}
{% endif %}
</td>
```
Place this within the existing per-server row, between or alongside the existing state/name cells — match the surrounding markup. Style class is just a hook for later CSS.
- [ ] **Step 6: Run, verify pass**
Run: `pytest l4d2web/tests/test_server_routes.py -v -k live_state_badge`
Expected: PASS.
Run: `pytest l4d2web/tests -q`
Expected: PASS.
- [ ] **Step 7: Commit**
```bash
git add l4d2web/routes/*.py l4d2web/templates/servers.html l4d2web/tests/test_server_routes.py
git commit -m "feat(servers): show live counts + map badge in server list"
```
---
## Task 11: Server-detail live-state fragment + route
**Files:**
- Modify: `l4d2web/routes/server_routes.py`
- Create: `l4d2web/templates/_live_state.html`
- Modify: `l4d2web/templates/server_detail.html`
- Modify: response-headers / CSP setup file (locate during step 1)
- Test: `l4d2web/tests/test_server_routes.py`
- [ ] **Step 1: Locate the CSP / response-headers code**
Run: `grep -rn "Content-Security-Policy\|img-src\|after_request" l4d2web/`
If a CSP is set, find the `img-src` directive — we need to add the Steam avatar CDN hosts.
- [ ] **Step 2: Write the failing fragment route test**
Append to `l4d2web/tests/test_server_routes.py`:
```python
def test_live_state_fragment_renders_current_and_recent(user_client) -> None:
now = datetime.now(UTC).replace(tzinfo=None)
with session_scope() as db:
u = db.scalar(select(User).where(User.username == "user"))
bp = db.scalar(select(Blueprint).where(Blueprint.user_id == u.id))
srv = Server(user_id=u.id, blueprint_id=bp.id, name="srv", port=27800,
rcon_password="x", actual_state="running")
db.add(srv); db.flush()
db.add(ServerLiveState(
server_id=srv.id, started_at=now, last_seen_at=now,
players=1, max_players=4, bots=0, map="c1m2_streets", hibernating=False,
))
db.add(ServerPlayerSession(
server_id=srv.id, steam_id_64="76561197960828710",
joined_at=now - timedelta(minutes=5), left_at=None,
name_at_join="Crone", min_ping=40, max_ping=60,
))
db.add(ServerPlayerSession(
server_id=srv.id, steam_id_64="76561198021234567",
joined_at=now - timedelta(hours=2), left_at=now - timedelta(hours=1),
name_at_join="OldPlayer", min_ping=20, max_ping=80,
))
db.add(SteamUserProfile(
steam_id_64="76561197960828710",
persona_name="MrCool42",
avatar_url="https://avatars.cloudflare.steamstatic.com/cur_medium.jpg",
fetched_at=now,
))
db.add(SteamUserProfile(
steam_id_64="76561198021234567",
persona_name="OldPersona",
avatar_url="https://avatars.cloudflare.steamstatic.com/old_medium.jpg",
fetched_at=now,
))
srv_id = srv.id
res = user_client.get(f"/servers/{srv_id}/live-state")
assert res.status_code == 200
html = res.get_data(as_text=True)
# Summary
assert "1/4" in html
assert "c1m2_streets" in html
# Current player block
assert "MrCool42" in html
assert "cur_medium.jpg" in html
assert "40-60" in html or "4060" in html
# Recent block — only OldPlayer, not MrCool42
assert "OldPersona" in html
assert "old_medium.jpg" in html
```
- [ ] **Step 3: Run, verify failure**
Run: `pytest l4d2web/tests/test_server_routes.py -v -k live_state_fragment`
Expected: FAIL — route 404.
- [ ] **Step 4: Add the fragment route in `l4d2web/routes/server_routes.py`**
```python
from datetime import datetime, timedelta, UTC
from flask import current_app, render_template
from sqlalchemy import select, func
from l4d2web.models import ServerLiveState, ServerPlayerSession, SteamUserProfile
@bp.get("/servers/<int:server_id>/live-state")
@require_login
def live_state_fragment(server_id: int) -> Response:
user = current_user()
assert user is not None
with session_scope() as db:
server = db.scalar(select(Server).where(
Server.id == server_id, Server.user_id == user.id,
))
if server is None:
return Response(status=404)
stale_seconds = current_app.config.get("LIVE_STATE_STALE_SECONDS", 30)
cutoff = datetime.now(UTC).replace(tzinfo=None) - timedelta(seconds=stale_seconds)
latest = db.scalar(
select(ServerLiveState)
.where(ServerLiveState.server_id == server.id)
.order_by(ServerLiveState.started_at.desc())
.limit(1)
)
current_rows = db.execute(
select(ServerPlayerSession, SteamUserProfile)
.outerjoin(
SteamUserProfile,
SteamUserProfile.steam_id_64 == ServerPlayerSession.steam_id_64,
)
.where(
ServerPlayerSession.server_id == server.id,
ServerPlayerSession.left_at.is_(None),
)
.order_by(ServerPlayerSession.joined_at)
).all()
current_ids = [r[0].steam_id_64 for r in current_rows]
recent_cutoff = datetime.now(UTC).replace(tzinfo=None) - timedelta(
days=current_app.config.get("LIVE_STATE_HISTORY_DAYS", 30)
)
recent_rows = db.execute(
select(
ServerPlayerSession.steam_id_64,
func.max(ServerPlayerSession.left_at).label("last_seen"),
ServerPlayerSession.name_at_join,
SteamUserProfile.persona_name,
SteamUserProfile.avatar_url,
)
.outerjoin(
SteamUserProfile,
SteamUserProfile.steam_id_64 == ServerPlayerSession.steam_id_64,
)
.where(
ServerPlayerSession.server_id == server.id,
ServerPlayerSession.left_at.is_not(None),
ServerPlayerSession.left_at >= recent_cutoff,
~ServerPlayerSession.steam_id_64.in_(current_ids) if current_ids else True,
)
.group_by(
ServerPlayerSession.steam_id_64,
SteamUserProfile.persona_name,
SteamUserProfile.avatar_url,
ServerPlayerSession.name_at_join,
)
.order_by(func.max(ServerPlayerSession.left_at).desc())
.limit(20)
).all()
return render_template(
"_live_state.html",
server=server,
snapshot=latest,
snapshot_fresh=(latest is not None and latest.last_seen_at >= cutoff),
current_players=current_rows,
recent_players=recent_rows,
now=datetime.now(UTC).replace(tzinfo=None),
poll_seconds=current_app.config.get("LIVE_STATE_POLL_SECONDS", 5),
)
```
- [ ] **Step 5: Create `l4d2web/templates/_live_state.html`**
```jinja
<section class="panel live-state"
hx-get="/servers/{{ server.id }}/live-state"
hx-trigger="every {{ poll_seconds }}s"
hx-swap="outerHTML">
<h2 class="section-title">Live state</h2>
{% if not snapshot or not snapshot_fresh %}
<p class="muted">No data — server is not currently reporting.</p>
{% else %}
<p class="server-live-summary">
{{ snapshot.players }}/{{ snapshot.max_players }}
{% if snapshot.hibernating %}· idle{% endif %}
· {{ snapshot.map }}
<small class="muted">
polled {{ ((now - snapshot.last_seen_at).total_seconds() | int) }}s ago
</small>
</p>
{% endif %}
{% if current_players %}
<h3 class="section-subtitle">Current players</h3>
<ul class="player-grid">
{% for session, profile in current_players %}
<li class="player-card">
{% if profile and profile.avatar_url %}
<img class="avatar" src="{{ profile.avatar_url }}" alt="" loading="lazy">
{% else %}
<span class="avatar placeholder" aria-hidden="true"></span>
{% endif %}
<span class="name">{{ (profile and profile.persona_name) or session.name_at_join }}</span>
<span class="meta">
joined {{ ((now - session.joined_at).total_seconds() // 60) | int }}m ago
· ping {{ session.min_ping }}-{{ session.max_ping }}ms
</span>
</li>
{% endfor %}
</ul>
{% endif %}
{% if recent_players %}
<h3 class="section-subtitle">Recent players</h3>
<ul class="player-grid recent">
{% for row in recent_players %}
<li class="player-card">
{% if row.avatar_url %}
<img class="avatar" src="{{ row.avatar_url }}" alt="" loading="lazy">
{% else %}
<span class="avatar placeholder" aria-hidden="true"></span>
{% endif %}
<span class="name">{{ row.persona_name or row.name_at_join }}</span>
<span class="meta">
last seen {{ ((now - row.last_seen).total_seconds() // 60) | int }}m ago
</span>
</li>
{% endfor %}
</ul>
{% endif %}
</section>
```
- [ ] **Step 6: Include the partial in `l4d2web/templates/server_detail.html`**
Below the existing actions/log sections, add:
```jinja
{% include "_live_state.html" %}
```
The route always returns the section with its HTMX wrapper, so the include and the fragment endpoint produce identical markup.
- [ ] **Step 7: Extend the CSP**
Find the existing CSP / response-headers code (located in Step 1). Add to the `img-src` directive:
```
'self'
https://avatars.cloudflare.steamstatic.com
https://avatars.akamai.steamstatic.com
https://avatars.steamstatic.com
https://steamuserimages-a.akamaihd.net
```
If no CSP exists, skip this step and note it in the PR description so a follow-up can address it.
- [ ] **Step 8: Run, verify pass**
Run: `pytest l4d2web/tests/test_server_routes.py -v -k live_state_fragment`
Expected: PASS.
Run: `pytest l4d2web/tests -q`
Expected: PASS.
- [ ] **Step 9: Commit**
```bash
git add l4d2web/routes/server_routes.py l4d2web/templates/_live_state.html \
l4d2web/templates/server_detail.html l4d2web/tests/test_server_routes.py
# include the CSP file if you edited one
git commit -m "feat(servers): add live-state panel with current and recent players"
```
---
## Task 12: Deploy template — Steam API key in `web.env`
**Files:**
- Modify: `deploy/templates/etc/left4me/web.env`
- [ ] **Step 1: Add the new var to the env template**
Open `deploy/templates/etc/left4me/web.env`. Append:
```
# Steam Web API key for ISteamUser/GetPlayerSummaries — used to resolve
# player Steam IDs to persona names + avatars in the server live-state
# panel. Free at https://steamcommunity.com/dev/apikey. Optional: if
# empty, the live-state panel still shows counts/map and the in-game
# name from RCON, just with placeholder avatars.
STEAM_WEB_API_KEY=
```
If `web.env` is templated via `envsubst` or similar at deploy time, also add `STEAM_WEB_API_KEY` to the build-side variable list.
- [ ] **Step 2: Commit**
```bash
git add deploy/templates/etc/left4me/web.env
git commit -m "deploy: add STEAM_WEB_API_KEY to web.env template"
```
---
## Task 13: Smoke test against prod (manual)
No code changes. Verification only.
- [ ] **Step 1: Full suite + lint**
Run: `pytest l4d2web/tests -q`
Expected: PASS.
If `ruff` or similar is wired into CI, run it too.
- [ ] **Step 2: Deploy to `left4.me`**
Follow the standard deploy path (`deploy/deploy-test-server.sh left4.me` or whatever the project uses). After deploy:
```bash
ssh left4.me 'systemctl status left4me-web.service'
ssh left4.me 'journalctl -u left4me-web.service -n 50 --no-pager'
```
Expected: service is `active (running)`, no crash-loop, journal mentions the live-state poller starting.
- [ ] **Step 3: Confirm migration backfilled passwords**
```bash
ssh left4.me 'sudo -u left4me sqlite3 /var/lib/left4me/l4d2web.db \
"SELECT id, name, length(rcon_password) FROM servers"'
```
Expected: every row has `length >= 32`.
- [ ] **Step 4: Re-initialize and restart each existing server**
In the web UI, click `initialize` then `start` for each existing server. (Or the equivalent from `l4d2ctl`.)
Confirm `rcon_password "..."` is the last non-blank line in each `server.cfg`:
```bash
ssh left4.me 'sudo cat /opt/l4d2/instances/<name>/server.cfg | tail -5'
```
- [ ] **Step 5: Verify poller is writing**
After ~10s:
```bash
ssh left4.me 'sudo -u left4me sqlite3 /var/lib/left4me/l4d2web.db \
"SELECT server_id, started_at, last_seen_at, players, map, hibernating \
FROM server_live_state ORDER BY server_id, started_at DESC LIMIT 6"'
```
Expected: one fresh row per server. `last_seen_at` is recent.
- [ ] **Step 6: Join a server from the L4D2 client**
Connect to one of the prod servers. Within 5-10s:
- `/servers` shows the badge change to `1/4 · <map>`.
- `/servers/<id>` shows your avatar + persona name + ping range in the "Current players" block.
Disconnect. Within 5-10s:
- Badge returns to `0/4 · idle · <map>`.
- Detail page card moves from "Current" to "Recent players."
- [ ] **Step 7: Verify sessions table state**
```bash
ssh left4.me 'sudo -u left4me sqlite3 /var/lib/left4me/l4d2web.db \
"SELECT steam_id_64, name_at_join, joined_at, left_at, min_ping, max_ping \
FROM server_player_session ORDER BY joined_at DESC LIMIT 5"'
```
Expected: one row per connection. `left_at` is non-null after disconnect; `min_ping <= max_ping`.
- [ ] **Step 8: Failure-path checks**
a. Set `STEAM_WEB_API_KEY=` (empty) in `/etc/left4me/web.env`, restart service. Confirm the UI still renders (in-game names, placeholder avatars) and the journal has no stack traces.
b. Edit one server's `rcon_password` in the DB to garbage. Confirm the journal logs "rcon query failed" and the badge goes stale; other servers are unaffected.
c. Wait for the stuck-session threshold to pass. Confirm any open sessions for the broken server get closed.
---
## Self-Review
**Spec coverage**: Schema (3 tables + 1 column) → Task 1. RCON client → Task 2. Spec injection → Task 3. Password generation → Task 4. Steam API → Task 5. Poller (RLE / sessions / enrichment / retention / startup) → Tasks 6-9. UI (list / detail) → Tasks 10-11. Deploy env → Task 12. Verification → Task 13.
**Type consistency**: `StatusResponse`, `PlayerRow`, `SteamProfile`, `ServerLiveState`, `ServerPlayerSession`, `SteamUserProfile` are referenced identically across tasks. `query_status`, `fetch_profiles_batch`, `start_live_state_poller`, `poll_once`, `prune_history` keep their signatures throughout. Config keys (`LIVE_STATE_POLL_SECONDS`, `STEAM_PROFILE_TTL_SECONDS`, `STUCK_SESSION_SECONDS`, `STEAM_WEB_API_KEY`, `LIVE_STATE_HISTORY_DAYS`, `LIVE_STATE_QUERY_TIMEOUT_SECONDS`, `LIVE_STATE_STALE_SECONDS`, `LIVE_STATE_RETENTION_EVERY_TICKS`) are referenced consistently.
**Open items deferred to implementation time** (called out explicitly in the relevant tasks rather than left as TBDs):
- Exact CSP wiring location (Task 11 Step 1)
- Whether `l4d2web/config.py` auto-loads env vars or needs explicit registration (Task 9 Step 4)
- Existing test fixtures (`user_client`, default blueprint seed) — reuse if present (Tasks 4, 10, 11)
- Servers-index handler location (Task 10 Step 1)
- Migration test layout — extend existing `test_migrations.py` if it exists, otherwise create (Task 1 Step 5)
**No placeholders or TODOs** in test code or implementation.