left4me/docs/superpowers/plans/2026-05-12-server-live-state-display.md
mwiegand a5f7b736a2
docs/plan: server live-state display implementation plan
Thirteen TDD-structured tasks covering schema migration, RCON client,
spec injection, password generation, Steam Web API client, live-state
poller (RLE snapshots + session reconciliation + profile enrichment +
retention + thread startup), server list badge, server detail
fragment, deploy env, and end-to-end smoke.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 21:10:33 +02:00

87 KiB
Raw Blame History

Server Live-State Display Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Add a read-side live-state display to the web UI — counts/map/hibernating on the server list, plus a server-detail panel with current + recent players (avatars from Steam) — backed by a persistent history table.

Architecture: A daemon-thread poller (modeled on the existing start_state_poller in l4d2web/services/job_worker.py:617-647) polls each running game server's RCON port every 5s, writes run-length-encoded snapshots to server_live_state, maintains player session intervals in server_player_session, and enriches Steam IDs via the Steam Web API into steam_user_profile (24h cache). RCON passwords are per-server, auto-generated, and injected into server.cfg via the existing spec yaml pipeline — no host-side changes.

Tech Stack: Python 3.12+, Flask, SQLAlchemy 2.x, Alembic, requests (already used by steam_workshop.py), pure-stdlib socket/struct for the RCON protocol, HTMX for the auto-refreshing detail panel, pytest.

Spec: docs/superpowers/specs/2026-05-12-server-live-state-display-design.md


File Structure

Create:

  • l4d2web/alembic/versions/0010_server_live_state.py — schema migration: one new column on servers, three new tables, password backfill for existing rows
  • l4d2web/services/rcon.py — Source RCON client + status parser
  • l4d2web/services/steam_users.py — Steam Web API client (mirrors l4d2web/services/steam_workshop.py:17-43)
  • l4d2web/services/live_state_poller.py — daemon thread + poll loop + RLE snapshot + session reconciler
  • l4d2web/templates/_live_state.html — HTMX-refreshed fragment with summary + current + recent blocks
  • l4d2web/tests/test_rcon.py, l4d2web/tests/test_steam_users.py, l4d2web/tests/test_live_state_poller.py

Modify:

  • l4d2web/models.pyServer.rcon_password column + three new model classes
  • l4d2web/services/l4d2_facade.py:28-52build_server_spec_payload appends rcon_password "..."
  • l4d2web/routes/server_routes.py:58-83create_server generates a password; add live_state_fragment route
  • l4d2web/templates/servers.html — inline badge column per server row
  • l4d2web/templates/server_detail.html — include _live_state.html inside an HTMX wrapper
  • l4d2web/app.py — call start_live_state_poller(app) next to existing start_state_poller
  • l4d2web/config.py — register seven new config keys
  • deploy/templates/etc/left4me/web.env — add STEAM_WEB_API_KEY=
  • CSP / response headers (find existing setup during Task 11) — add avatars.*.steamstatic.com to img-src

Reused (no changes):

  • l4d2web/services/job_worker.py:617-647 — pattern reference for daemon-thread / poll-loop
  • l4d2web/services/steam_workshop.py:17-43 — pattern reference for the Steam Web API client (requests.Session, 30s timeout, threaded executor)
  • l4d2host/instances.py:40-58 — already writes spec.config verbatim to server.cfg, so injected rcon_password line lands automatically
  • l4d2web/templates/_overlay_build_status.html:1-5 — HTMX polling pattern reference

Task 1: Schema migration + ORM models

Files:

  • Create: l4d2web/alembic/versions/0010_server_live_state.py

  • Modify: l4d2web/models.py

  • Test: l4d2web/tests/test_models.py (extend or create), l4d2web/tests/test_migrations.py (extend or create — check existing test layout)

  • Step 1: Write the failing model + migration tests

Add to l4d2web/tests/test_models.py (or create it):

import secrets

from sqlalchemy import select

from l4d2web.db import init_db, session_scope
from l4d2web.models import (
    Server,
    ServerLiveState,
    ServerPlayerSession,
    SteamUserProfile,
    User,
    Blueprint,
)


def test_server_has_rcon_password_column(tmp_path, monkeypatch) -> None:
    monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'t.db'}")
    init_db()
    with session_scope() as db:
        u = User(username="u", password_digest="x")
        db.add(u); db.flush()
        bp = Blueprint(user_id=u.id, name="bp", arguments="[]", config="[]")
        db.add(bp); db.flush()
        s = Server(
            user_id=u.id, blueprint_id=bp.id, name="s", port=27500,
            rcon_password="abc",
        )
        db.add(s); db.flush()
        assert db.scalar(select(Server.rcon_password).where(Server.id == s.id)) == "abc"


def test_server_live_state_table_columns(tmp_path, monkeypatch) -> None:
    monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'t.db'}")
    init_db()
    # Smoke check the table exists with the expected columns by inserting a row.
    with session_scope() as db:
        u = User(username="u", password_digest="x"); db.add(u); db.flush()
        bp = Blueprint(user_id=u.id, name="bp", arguments="[]", config="[]"); db.add(bp); db.flush()
        s = Server(user_id=u.id, blueprint_id=bp.id, name="s", port=27501, rcon_password="x")
        db.add(s); db.flush()
        row = ServerLiveState(
            server_id=s.id, started_at=now_utc_aware(), last_seen_at=now_utc_aware(),
            players=2, max_players=4, bots=0, map="c1m1_hotel", hibernating=False,
        )
        db.add(row); db.flush()


def test_server_player_session_table_columns(tmp_path, monkeypatch) -> None:
    monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'t.db'}")
    init_db()
    with session_scope() as db:
        u = User(username="u", password_digest="x"); db.add(u); db.flush()
        bp = Blueprint(user_id=u.id, name="bp", arguments="[]", config="[]"); db.add(bp); db.flush()
        s = Server(user_id=u.id, blueprint_id=bp.id, name="s", port=27502, rcon_password="x")
        db.add(s); db.flush()
        row = ServerPlayerSession(
            server_id=s.id, steam_id_64="76561197960828710",
            joined_at=now_utc_aware(), left_at=None,
            name_at_join="Crone", min_ping=42, max_ping=185,
        )
        db.add(row); db.flush()


def test_steam_user_profile_table_columns(tmp_path, monkeypatch) -> None:
    monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'t.db'}")
    init_db()
    with session_scope() as db:
        row = SteamUserProfile(
            steam_id_64="76561197960828710",
            persona_name="MrCool42",
            avatar_url="https://avatars.cloudflare.steamstatic.com/abc_medium.jpg",
            fetched_at=now_utc_aware(),
        )
        db.add(row); db.flush()

Add this helper next to the imports (or import now_utc from models):

from l4d2web.models import now_utc as now_utc_aware
  • Step 2: Run model tests to verify they fail

Run: pytest l4d2web/tests/test_models.py -v Expected: FAIL — ImportError: cannot import name 'ServerLiveState' (or column missing).

  • Step 3: Add models to l4d2web/models.py

After the existing Server class (around line 137), add the new column to Server:

class Server(Base):
    __tablename__ = "servers"
    __table_args__ = (
        UniqueConstraint("user_id", "name", name="uq_servers_user_name"),
    )

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
    blueprint_id: Mapped[int] = mapped_column(ForeignKey("blueprints.id"), nullable=False)
    name: Mapped[str] = mapped_column(String(128), nullable=False)
    port: Mapped[int] = mapped_column(Integer, unique=True, nullable=False)
    desired_state: Mapped[str] = mapped_column(String(16), default="stopped", nullable=False)
    actual_state: Mapped[str] = mapped_column(String(16), default="unknown", nullable=False)
    actual_state_updated_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
    last_error: Mapped[str] = mapped_column(Text, default="", nullable=False)
    rcon_password: Mapped[str] = mapped_column(
        String(64), nullable=False, default="", server_default=""
    )
    created_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
    updated_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)

Then add at the end of the file (after the last model):

class ServerLiveState(Base):
    __tablename__ = "server_live_state"
    __table_args__ = (
        Index("ix_sls_server_started", "server_id", "started_at"),
        {"sqlite_autoincrement": True},
    )

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    server_id: Mapped[int] = mapped_column(
        ForeignKey("servers.id", ondelete="CASCADE"), nullable=False
    )
    started_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
    last_seen_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
    players: Mapped[int] = mapped_column(Integer, nullable=False)
    max_players: Mapped[int] = mapped_column(Integer, nullable=False)
    bots: Mapped[int] = mapped_column(Integer, nullable=False)
    map: Mapped[str] = mapped_column(String(64), nullable=False)
    hibernating: Mapped[bool] = mapped_column(Boolean, nullable=False)


class ServerPlayerSession(Base):
    __tablename__ = "server_player_session"
    __table_args__ = (
        Index("ix_sps_server_open", "server_id", "left_at"),
        Index("ix_sps_server_recent", "server_id", "left_at"),
        Index("ix_sps_steam_history", "steam_id_64", "joined_at"),
        {"sqlite_autoincrement": True},
    )

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    server_id: Mapped[int] = mapped_column(
        ForeignKey("servers.id", ondelete="CASCADE"), nullable=False
    )
    steam_id_64: Mapped[str] = mapped_column(String(20), nullable=False)
    joined_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
    left_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
    name_at_join: Mapped[str] = mapped_column(String(64), nullable=False)
    min_ping: Mapped[int] = mapped_column(Integer, nullable=False)
    max_ping: Mapped[int] = mapped_column(Integer, nullable=False)


class SteamUserProfile(Base):
    __tablename__ = "steam_user_profile"

    steam_id_64: Mapped[str] = mapped_column(String(20), primary_key=True)
    persona_name: Mapped[str] = mapped_column(String(64), nullable=False)
    avatar_url: Mapped[str] = mapped_column(Text, nullable=False)
    fetched_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
  • Step 4: Create the migration file

Create l4d2web/alembic/versions/0010_server_live_state.py:

"""server_live_state schema

Revision ID: 0010_server_live_state
Revises: 0009_user_password_changed_at
Create Date: 2026-05-12
"""
from __future__ import annotations

import secrets
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op


revision: str = "0010_server_live_state"
down_revision: Union[str, Sequence[str], None] = "0009_user_password_changed_at"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    # 1. Add rcon_password to servers, default ''
    with op.batch_alter_table("servers") as batch:
        batch.add_column(
            sa.Column(
                "rcon_password",
                sa.String(length=64),
                nullable=False,
                server_default="",
            )
        )

    # 2. Backfill every existing row with a freshly generated password.
    conn = op.get_bind()
    rows = conn.execute(sa.text("SELECT id FROM servers")).fetchall()
    for row in rows:
        conn.execute(
            sa.text("UPDATE servers SET rcon_password = :pw WHERE id = :id"),
            {"pw": secrets.token_urlsafe(32), "id": row.id},
        )

    # 3. server_live_state — run-length-encoded snapshots
    op.create_table(
        "server_live_state",
        sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True),
        sa.Column(
            "server_id",
            sa.Integer(),
            sa.ForeignKey("servers.id", ondelete="CASCADE"),
            nullable=False,
        ),
        sa.Column("started_at", sa.DateTime(), nullable=False),
        sa.Column("last_seen_at", sa.DateTime(), nullable=False),
        sa.Column("players", sa.Integer(), nullable=False),
        sa.Column("max_players", sa.Integer(), nullable=False),
        sa.Column("bots", sa.Integer(), nullable=False),
        sa.Column("map", sa.String(length=64), nullable=False),
        sa.Column("hibernating", sa.Boolean(), nullable=False),
        sqlite_autoincrement=True,
    )
    op.create_index(
        "ix_sls_server_started",
        "server_live_state",
        ["server_id", "started_at"],
    )

    # 4. server_player_session — connection intervals
    op.create_table(
        "server_player_session",
        sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True),
        sa.Column(
            "server_id",
            sa.Integer(),
            sa.ForeignKey("servers.id", ondelete="CASCADE"),
            nullable=False,
        ),
        sa.Column("steam_id_64", sa.String(length=20), nullable=False),
        sa.Column("joined_at", sa.DateTime(), nullable=False),
        sa.Column("left_at", sa.DateTime(), nullable=True),
        sa.Column("name_at_join", sa.String(length=64), nullable=False),
        sa.Column("min_ping", sa.Integer(), nullable=False),
        sa.Column("max_ping", sa.Integer(), nullable=False),
        sqlite_autoincrement=True,
    )
    op.create_index("ix_sps_server_open", "server_player_session", ["server_id", "left_at"])
    op.create_index("ix_sps_server_recent", "server_player_session", ["server_id", "left_at"])
    op.create_index(
        "ix_sps_steam_history", "server_player_session", ["steam_id_64", "joined_at"]
    )

    # 5. steam_user_profile — 24h profile cache
    op.create_table(
        "steam_user_profile",
        sa.Column("steam_id_64", sa.String(length=20), primary_key=True),
        sa.Column("persona_name", sa.String(length=64), nullable=False),
        sa.Column("avatar_url", sa.Text(), nullable=False),
        sa.Column("fetched_at", sa.DateTime(), nullable=False),
    )


def downgrade() -> None:
    op.drop_table("steam_user_profile")
    op.drop_index("ix_sps_steam_history", table_name="server_player_session")
    op.drop_index("ix_sps_server_recent", table_name="server_player_session")
    op.drop_index("ix_sps_server_open", table_name="server_player_session")
    op.drop_table("server_player_session")
    op.drop_index("ix_sls_server_started", table_name="server_live_state")
    op.drop_table("server_live_state")
    with op.batch_alter_table("servers") as batch:
        batch.drop_column("rcon_password")
  • Step 5: Add a migration-applies-and-backfills test

Append to l4d2web/tests/test_migrations.py (create it if absent):

import secrets
import subprocess
from pathlib import Path

import pytest
import sqlalchemy as sa
from sqlalchemy import create_engine, inspect


def test_migration_0010_backfills_rcon_password(tmp_path: Path, monkeypatch) -> None:
    db_path = tmp_path / "t.db"
    db_url = f"sqlite:///{db_path}"
    monkeypatch.setenv("DATABASE_URL", db_url)

    # Run alembic up to 0009 only, then seed two servers, then upgrade to 0010.
    repo_root = Path(__file__).resolve().parents[2]
    env = {"DATABASE_URL": db_url, "PATH": "/usr/bin:/bin"}

    subprocess.run(
        ["alembic", "upgrade", "0009_user_password_changed_at"],
        cwd=repo_root, env={**env}, check=True,
    )

    engine = create_engine(db_url)
    with engine.begin() as conn:
        conn.execute(sa.text(
            "INSERT INTO users (id, username, password_digest, admin, active, "
            "created_at, updated_at, password_changed_at) "
            "VALUES (1, 'u', 'x', 0, 1, '2026-01-01', '2026-01-01', '2026-01-01')"
        ))
        conn.execute(sa.text(
            "INSERT INTO blueprints (id, user_id, name, arguments, config, "
            "created_at, updated_at) "
            "VALUES (1, 1, 'bp', '[]', '[]', '2026-01-01', '2026-01-01')"
        ))
        conn.execute(sa.text(
            "INSERT INTO servers (id, user_id, blueprint_id, name, port, "
            "desired_state, actual_state, last_error, created_at, updated_at) "
            "VALUES (1, 1, 1, 's1', 27600, 'stopped', 'unknown', '', "
            "'2026-01-01', '2026-01-01')"
        ))
        conn.execute(sa.text(
            "INSERT INTO servers (id, user_id, blueprint_id, name, port, "
            "desired_state, actual_state, last_error, created_at, updated_at) "
            "VALUES (2, 1, 1, 's2', 27601, 'stopped', 'unknown', '', "
            "'2026-01-01', '2026-01-01')"
        ))

    subprocess.run(
        ["alembic", "upgrade", "0010_server_live_state"],
        cwd=repo_root, env={**env}, check=True,
    )

    with engine.connect() as conn:
        rows = conn.execute(sa.text("SELECT id, rcon_password FROM servers ORDER BY id")).fetchall()

    assert len(rows) == 2
    for row in rows:
        assert len(row.rcon_password) >= 32
        assert row.rcon_password.replace("_", "").replace("-", "").isalnum()

    inspector = inspect(engine)
    assert "server_live_state" in inspector.get_table_names()
    assert "server_player_session" in inspector.get_table_names()
    assert "steam_user_profile" in inspector.get_table_names()

If the existing test layout already has a migrations fixture, prefer it. Confirm by skimming l4d2web/tests/conftest.py and any test_migrations.py once before writing this.

  • Step 6: Run all the new tests, verify pass

Run: pytest l4d2web/tests/test_models.py l4d2web/tests/test_migrations.py -v Expected: PASS.

  • Step 7: Full-suite regression

Run: pytest l4d2web/tests -q Expected: PASS (no regressions).

  • Step 8: Commit
git add l4d2web/alembic/versions/0010_server_live_state.py l4d2web/models.py \
        l4d2web/tests/test_models.py l4d2web/tests/test_migrations.py
git commit -m "feat(live-state): add schema for snapshots, sessions, steam profiles"

Task 2: RCON client — protocol handshake + auth

Files:

  • Create: l4d2web/services/rcon.py

  • Test: l4d2web/tests/test_rcon.py

  • Step 1: Write the failing handshake tests

Create l4d2web/tests/test_rcon.py:

"""Source RCON client tests against an in-process TCP fixture.

The handshake quirk we verified live: after a SERVERDATA_AUTH (type=3) the
server sends a SERVERDATA_RESPONSE_VALUE (type=0) FIRST and THEN the
SERVERDATA_AUTH_RESPONSE (type=2). The auth response's req_id == -1 means
bad password. The client must consume both packets before sending the
command.
"""
from __future__ import annotations

import socket
import struct
import threading
from contextlib import contextmanager
from typing import Iterator

import pytest

from l4d2web.services.rcon import (
    RconAuthError,
    RconError,
    query_status,
)


def _pack(req_id: int, ptype: int, body: str) -> bytes:
    body_bytes = body.encode("utf-8") + b"\x00\x00"
    size = 4 + 4 + len(body_bytes)
    return struct.pack("<iii", size, req_id, ptype) + body_bytes


def _unpack_one(conn: socket.socket) -> tuple[int, int, str]:
    raw_size = conn.recv(4)
    size = struct.unpack("<i", raw_size)[0]
    payload = b""
    while len(payload) < size:
        payload += conn.recv(size - len(payload))
    req_id, ptype = struct.unpack("<ii", payload[:8])
    body = payload[8:].rstrip(b"\x00").decode("utf-8", errors="replace")
    return req_id, ptype, body


@contextmanager
def fake_rcon_server(handler) -> Iterator[int]:
    """Start a TCP server on an ephemeral port; handler(conn) runs in a thread."""
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(("127.0.0.1", 0))
    server.listen(1)
    port = server.getsockname()[1]
    server.settimeout(3.0)

    def serve() -> None:
        try:
            conn, _ = server.accept()
            try:
                handler(conn)
            finally:
                conn.close()
        except Exception:
            pass

    t = threading.Thread(target=serve, daemon=True)
    t.start()
    try:
        yield port
    finally:
        server.close()
        t.join(timeout=1.0)


def test_auth_success_then_status(monkeypatch: pytest.MonkeyPatch) -> None:
    response_body = (
        "hostname: Left 4 Dead 2\n"
        "version : 2.2.4.3 9309 secure  (unknown)\n"
        "udp/ip  : 127.0.0.1:27016 [ public 1.2.3.4:27016 ]\n"
        "os      : Linux Dedicated\n"
        "map     : c1m2_streets\n"
        "players : 1 humans, 0 bots (4 max) (not hibernating) (reserved 1860000e7e5e446)\n"
        "\n"
        "# userid name uniqueid connected ping loss state rate adr\n"
        '#  2 1 "Crone" STEAM_1:0:12376499 00:21 185 20 active 30000 91.55.5.100:27005\n'
        "#end\n"
    )

    def handler(conn: socket.socket) -> None:
        req_id, ptype, body = _unpack_one(conn)
        assert ptype == 3
        assert body == "letmein"
        conn.sendall(_pack(req_id, 0, ""))
        conn.sendall(_pack(req_id, 2, ""))
        cmd_id, cmd_type, cmd = _unpack_one(conn)
        assert cmd_type == 2
        assert cmd == "status"
        conn.sendall(_pack(cmd_id, 0, response_body))

    with fake_rcon_server(handler) as port:
        result = query_status("127.0.0.1", port, "letmein", timeout=2.0)

    assert result.map == "c1m2_streets"
    assert result.players == 1
    assert result.bots == 0
    assert result.max_players == 4
    assert result.hibernating is False
    assert len(result.roster) == 1
    p = result.roster[0]
    assert p.name == "Crone"
    assert p.steam_id_64 == "76561197985018726"  # 76561197960265728 + 0 + 12376499*2
    assert p.connected_seconds == 21
    assert p.ping == 185


def test_auth_failure_raises(monkeypatch: pytest.MonkeyPatch) -> None:
    def handler(conn: socket.socket) -> None:
        req_id, _, _ = _unpack_one(conn)
        conn.sendall(_pack(req_id, 0, ""))
        conn.sendall(_pack(-1, 2, ""))  # bad password sentinel

    with fake_rcon_server(handler) as port:
        with pytest.raises(RconAuthError):
            query_status("127.0.0.1", port, "wrong", timeout=2.0)


def test_timeout_raises(monkeypatch: pytest.MonkeyPatch) -> None:
    def handler(conn: socket.socket) -> None:
        import time
        time.sleep(3.0)

    with fake_rcon_server(handler) as port:
        with pytest.raises(RconError):
            query_status("127.0.0.1", port, "x", timeout=0.3)
  • Step 2: Run the failing tests

Run: pytest l4d2web/tests/test_rcon.py -v Expected: FAIL — ModuleNotFoundError: No module named 'l4d2web.services.rcon'.

  • Step 3: Implement l4d2web/services/rcon.py
"""Source RCON client + status parser.

Pure stdlib. One TCP connection per query — fine at our scale (loopback
~10-20ms round-trip; pooling not worth the complexity).

Source RCON wire format:
  size  : little-endian int32 (count of the bytes that follow)
  req_id: little-endian int32
  ptype : little-endian int32
  body  : utf-8 string, null-terminated
  pad   : one extra null byte

Packet types:
  SERVERDATA_AUTH         = 3   (client -> server)
  SERVERDATA_EXECCOMMAND  = 2   (client -> server)
  SERVERDATA_AUTH_RESPONSE= 2   (server -> client)
  SERVERDATA_RESPONSE_VALUE = 0 (server -> client)

After auth, the server sends a type=0 empty packet *first* and then the
type=2 auth response. req_id == -1 on the auth response = bad password.
"""
from __future__ import annotations

import re
import socket
import struct
from dataclasses import dataclass
from typing import Iterable


SERVERDATA_AUTH = 3
SERVERDATA_EXECCOMMAND = 2
SERVERDATA_AUTH_RESPONSE = 2
SERVERDATA_RESPONSE_VALUE = 0

_STEAM_ID_BASE = 76561197960265728


class RconError(Exception):
    """Network, timeout, or protocol error."""


class RconAuthError(RconError):
    """The server rejected the password."""


@dataclass(slots=True, frozen=True)
class PlayerRow:
    steam_id_64: str
    name: str
    connected_seconds: int
    ping: int


@dataclass(slots=True, frozen=True)
class StatusResponse:
    map: str
    players: int
    max_players: int
    bots: int
    hibernating: bool
    roster: list[PlayerRow]


def query_status(
    host: str, port: int, password: str, *, timeout: float = 2.0
) -> StatusResponse:
    """Connect to the RCON port, authenticate, send `status`, return parsed result."""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    try:
        try:
            sock.connect((host, port))
        except OSError as exc:
            raise RconError(f"connect failed: {exc}") from exc

        try:
            _send_packet(sock, 1, SERVERDATA_AUTH, password)
            # Drain the leading empty type-0 packet; then read the real auth response.
            r1 = _recv_packet(sock)
            r2 = _recv_packet(sock)
            auth = r2 if r1[1] == SERVERDATA_RESPONSE_VALUE else r1
            if auth[0] == -1:
                raise RconAuthError("bad rcon password")

            _send_packet(sock, 2, SERVERDATA_EXECCOMMAND, "status")
            _, _, body = _recv_packet(sock)
        except (OSError, socket.timeout) as exc:
            raise RconError(f"rcon i/o error: {exc}") from exc
    finally:
        sock.close()

    return parse_status(body)


def _send_packet(sock: socket.socket, req_id: int, ptype: int, body: str) -> None:
    body_bytes = body.encode("utf-8") + b"\x00\x00"
    size = 4 + 4 + len(body_bytes)
    sock.sendall(struct.pack("<iii", size, req_id, ptype) + body_bytes)


def _recv_packet(sock: socket.socket) -> tuple[int, int, str]:
    size = struct.unpack("<i", _recvall(sock, 4))[0]
    payload = _recvall(sock, size)
    req_id, ptype = struct.unpack("<ii", payload[:8])
    body = payload[8:].rstrip(b"\x00").decode("utf-8", errors="replace")
    return req_id, ptype, body


def _recvall(sock: socket.socket, n: int) -> bytes:
    data = b""
    while len(data) < n:
        chunk = sock.recv(n - len(data))
        if not chunk:
            raise RconError("rcon connection closed")
        data += chunk
    return data


# --- Status parsing -------------------------------------------------------

_MAP_RE = re.compile(r"^map\s*:\s*(\S+)", re.MULTILINE)
_PLAYERS_RE = re.compile(
    r"^players\s*:\s*(\d+)\s+humans,\s*(\d+)\s+bots\s*\((\d+)\s+max\)"
    r"\s*\((not hibernating|hibernating)\)",
    re.MULTILINE,
)
# A status player row: starts with `#`, then variable numeric prefixes,
# then a quoted name, then STEAM_X:Y:Z, then connected time, then ping.
_PLAYER_RE = re.compile(
    r'^#\s+(?:\d+\s+)+"(?P<name>[^"]*)"\s+'
    r"(?P<sid>STEAM_\d+:(?P<y>\d+):(?P<z>\d+))\s+"
    r"(?P<connected>[\d:]+)\s+"
    r"(?P<ping>\d+)\s+",
    re.MULTILINE,
)


def parse_status(body: str) -> StatusResponse:
    map_match = _MAP_RE.search(body)
    if not map_match:
        raise RconError(f"status: no map line in response\n{body!r}")
    players_match = _PLAYERS_RE.search(body)
    if not players_match:
        raise RconError(f"status: no players line\n{body!r}")

    roster: list[PlayerRow] = []
    for m in _PLAYER_RE.finditer(body):
        y = int(m.group("y"))
        z = int(m.group("z"))
        roster.append(
            PlayerRow(
                steam_id_64=str(_STEAM_ID_BASE + (z * 2) + y),
                name=m.group("name"),
                connected_seconds=_parse_duration(m.group("connected")),
                ping=int(m.group("ping")),
            )
        )

    return StatusResponse(
        map=map_match.group(1),
        players=int(players_match.group(1)),
        bots=int(players_match.group(2)),
        max_players=int(players_match.group(3)),
        hibernating=(players_match.group(4) == "hibernating"),
        roster=roster,
    )


def _parse_duration(text: str) -> int:
    """Parse Source's connected duration: HH:MM:SS or MM:SS -> seconds."""
    parts = [int(p) for p in text.split(":")]
    if len(parts) == 2:
        return parts[0] * 60 + parts[1]
    if len(parts) == 3:
        return parts[0] * 3600 + parts[1] * 60 + parts[2]
    raise RconError(f"unparseable connected duration: {text!r}")

Note on Steam ID conversion: STEAM_X:Y:Z → SteamID64 = 76561197960265728 + Z*2 + Y. The earlier inline math in this plan said Y*2 + Z; the correct formula is Z*2 + Y (Y is the lowest bit). The test expectation 76561197985018726 reflects the correct math for STEAM_1:0:12376499.

  • Step 4: Run the RCON tests, verify pass

Run: pytest l4d2web/tests/test_rcon.py -v Expected: PASS.

  • Step 5: Commit
git add l4d2web/services/rcon.py l4d2web/tests/test_rcon.py
git commit -m "feat(rcon): add Source RCON client + status parser"

Task 3: Append RCON password to spec yaml

Files:

  • Modify: l4d2web/services/l4d2_facade.py:28-52

  • Test: l4d2web/tests/test_l4d2_facade.py

  • Step 1: Write the failing facade tests

Append to l4d2web/tests/test_l4d2_facade.py:

from l4d2web.models import Blueprint, Server
from l4d2web.services.l4d2_facade import build_server_spec_payload


def _make_server_blueprint(rcon: str = "") -> tuple[Server, Blueprint]:
    bp = Blueprint(
        id=1, user_id=1, name="bp",
        arguments='["-tickrate","60"]',
        config='["sv_consistency 1","mp_gamemode coop"]',
    )
    srv = Server(
        id=1, user_id=1, blueprint_id=1, name="s", port=27500,
        rcon_password=rcon,
    )
    return srv, bp


def test_build_server_spec_payload_appends_rcon_password_last() -> None:
    srv, bp = _make_server_blueprint(rcon="topsecret123")
    overlays = [(7, "/overlays/7", True), (8, "/overlays/8", False)]
    spec = build_server_spec_payload(srv, bp, overlays)

    cfg = spec["config"]
    # rcon_password line is the LAST entry — overlay exec lines + blueprint
    # config + rcon_password.
    assert cfg[-1] == 'rcon_password "topsecret123"'
    # Lines before our injection still contain the blueprint config.
    assert "sv_consistency 1" in cfg
    assert "mp_gamemode coop" in cfg


def test_build_server_spec_payload_omits_rcon_password_when_empty() -> None:
    srv, bp = _make_server_blueprint(rcon="")
    spec = build_server_spec_payload(srv, bp, [])
    for line in spec["config"]:
        assert not line.startswith("rcon_password ")
  • Step 2: Run failing tests

Run: pytest l4d2web/tests/test_l4d2_facade.py -v -k rcon_password Expected: FAIL — assertion errors (line not appended).

  • Step 3: Modify l4d2web/services/l4d2_facade.py:28-52

Replace the existing build_server_spec_payload body that ends with return { ... "config": exec_lines + json.loads(blueprint.config) } with:

def build_server_spec_payload(
    server: Server,
    blueprint: Blueprint,
    overlay_rows: list[tuple[int, str, bool]],
) -> dict:
    overlays: list[dict] = []
    for overlay_id, path, expose in overlay_rows:
        if expose:
            overlays.append({"path": path, "alias": f"overlay_{overlay_id}"})
        else:
            overlays.append({"path": path})
    # Source `exec` is last-wins. First list entry = topmost overlay = highest
    # precedence, so its exec runs LAST. Emit in reverse position order.
    exec_lines = [
        f"exec server_overlay_{overlay_id}"
        for overlay_id, _, expose in reversed(overlay_rows)
        if expose
    ]
    config_lines: list[str] = exec_lines + json.loads(blueprint.config)
    # rcon_password is appended LAST so neither overlays nor user blueprint
    # config can override it (Source's cvar semantics are last-wins).
    if server.rcon_password:
        config_lines.append(f'rcon_password "{server.rcon_password}"')
    return {
        "port": server.port,
        "overlays": overlays,
        "arguments": json.loads(blueprint.arguments),
        "config": config_lines,
    }
  • Step 4: Run, verify pass

Run: pytest l4d2web/tests/test_l4d2_facade.py -v Expected: PASS (all existing + new).

  • Step 5: Commit
git add l4d2web/services/l4d2_facade.py l4d2web/tests/test_l4d2_facade.py
git commit -m "feat(facade): append rcon_password as final server.cfg line"

Task 4: Generate rcon_password on server create

Files:

  • Modify: l4d2web/routes/server_routes.py:58-83

  • Test: l4d2web/tests/test_server_routes.py (extend)

  • Step 1: Write the failing test

Append to l4d2web/tests/test_server_routes.py:

def test_create_server_generates_rcon_password(user_client) -> None:
    res = user_client.post(
        "/servers",
        data={"name": "fresh", "blueprint_id": "1"},
        headers={"X-CSRF-Token": "test-token"},
    )
    assert res.status_code in (200, 201, 302)

    with session_scope() as db:
        row = db.scalar(select(Server).where(Server.name == "fresh"))
        assert row is not None
        assert len(row.rcon_password) >= 32

If the user_client fixture and a default blueprint seed already exist in the test suite, reuse them. If not, mirror the existing create_server test you find next to this one.

  • Step 2: Run, verify it fails

Run: pytest l4d2web/tests/test_server_routes.py -v -k rcon_password Expected: FAIL — rcon_password is empty.

  • Step 3: Modify create_server in l4d2web/routes/server_routes.py:58-83

At the top of the file, add to the imports:

import secrets

In create_server, replace the Server(...) construction (around line 87) with:

        server = Server(
            user_id=user.id,
            blueprint_id=blueprint.id,
            name=name,
            port=port,
            desired_state="stopped",
            actual_state="unknown",
            last_error="",
            rcon_password=secrets.token_urlsafe(32),
        )
  • Step 4: Run, verify pass

Run: pytest l4d2web/tests/test_server_routes.py -v Expected: PASS.

  • Step 5: Commit
git add l4d2web/routes/server_routes.py l4d2web/tests/test_server_routes.py
git commit -m "feat(servers): generate rcon_password on server create"

Task 5: Steam Web API client

Files:

  • Create: l4d2web/services/steam_users.py

  • Test: l4d2web/tests/test_steam_users.py

  • Step 1: Write the failing tests

Create l4d2web/tests/test_steam_users.py:

from __future__ import annotations

from typing import Any

import pytest

from l4d2web.services import steam_users


class _FakeResponse:
    def __init__(self, json_body: dict[str, Any], status: int = 200) -> None:
        self._body = json_body
        self.status_code = status

    def raise_for_status(self) -> None:
        if self.status_code >= 400:
            raise RuntimeError(f"http {self.status_code}")

    def json(self) -> dict[str, Any]:
        return self._body


def _patched_get(monkeypatch: pytest.MonkeyPatch, body: dict, capture: list) -> None:
    def fake_get(url: str, params: dict, timeout: float = 30.0) -> _FakeResponse:
        capture.append({"url": url, "params": params, "timeout": timeout})
        return _FakeResponse(body)

    monkeypatch.setattr(steam_users, "_session_get", fake_get)


def test_fetch_profiles_batch_builds_correct_request(monkeypatch: pytest.MonkeyPatch) -> None:
    captured: list = []
    body = {"response": {"players": [
        {"steamid": "76561197960828710", "personaname": "Alice",
         "avatarmedium": "https://avatars.../alice_medium.jpg"},
    ]}}
    _patched_get(monkeypatch, body, captured)

    profiles = steam_users.fetch_profiles_batch(
        ["76561197960828710", "76561198021234567"], api_key="KEY"
    )

    assert captured[0]["url"].endswith("/GetPlayerSummaries/v0002/")
    assert captured[0]["params"]["key"] == "KEY"
    assert captured[0]["params"]["steamids"] == "76561197960828710,76561198021234567"
    assert len(profiles) == 1
    p = profiles[0]
    assert p.steam_id_64 == "76561197960828710"
    assert p.persona_name == "Alice"
    assert p.avatar_url.endswith("alice_medium.jpg")


def test_fetch_profiles_batch_skips_private_or_missing(monkeypatch: pytest.MonkeyPatch) -> None:
    # The Steam API simply omits non-resolvable IDs from the response. Caller
    # should accept that and return only what's there.
    body = {"response": {"players": [
        {"steamid": "76561197960828710", "personaname": "Alice",
         "avatarmedium": "https://avatars.../alice_medium.jpg"},
    ]}}
    _patched_get(monkeypatch, body, [])
    profiles = steam_users.fetch_profiles_batch(
        ["76561197960828710", "76561197999999999"], api_key="KEY"
    )
    assert len(profiles) == 1
    assert profiles[0].steam_id_64 == "76561197960828710"


def test_fetch_profiles_batch_chunks_by_100(monkeypatch: pytest.MonkeyPatch) -> None:
    ids = [str(76561197960000000 + i) for i in range(150)]
    calls: list = []
    body = {"response": {"players": []}}
    _patched_get(monkeypatch, body, calls)

    steam_users.fetch_profiles_batch(ids, api_key="KEY")

    assert len(calls) == 2
    assert calls[0]["params"]["steamids"].count(",") == 99   # 100 ids -> 99 commas
    assert calls[1]["params"]["steamids"].count(",") == 49   # 50 ids
  • Step 2: Run, verify failures

Run: pytest l4d2web/tests/test_steam_users.py -v Expected: FAIL — module missing.

  • Step 3: Implement l4d2web/services/steam_users.py
"""Steam Web API client for player profile lookups.

Mirrors the shape of l4d2web/services/steam_workshop.py:17-43:
- single thread-local requests.Session
- 30s timeout
- HTTPS only

Difference: GetPlayerSummaries requires an API key in the querystring,
unlike the anonymous workshop endpoints.
"""
from __future__ import annotations

import threading
from dataclasses import dataclass
from typing import Iterable

import requests


GET_PLAYER_SUMMARIES_URL = (
    "https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/"
)

REQUEST_TIMEOUT_SECONDS = 30.0
MAX_IDS_PER_CALL = 100

_session_local = threading.local()


def _session() -> requests.Session:
    sess = getattr(_session_local, "session", None)
    if sess is None:
        sess = requests.Session()
        _session_local.session = sess
    return sess


def _session_get(url: str, params: dict, timeout: float = REQUEST_TIMEOUT_SECONDS):
    """Indirection seam so tests can monkeypatch a fake here."""
    return _session().get(url, params=params, timeout=timeout)


@dataclass(slots=True, frozen=True)
class SteamProfile:
    steam_id_64: str
    persona_name: str
    avatar_url: str


def fetch_profiles_batch(
    steam_ids: Iterable[str], *, api_key: str
) -> list[SteamProfile]:
    """Resolve a batch of SteamID64 strings to persona name + avatar URL.

    Steam's API caps each call at 100 IDs; this helper chunks transparently.
    IDs that Steam can't resolve (private, deleted) are simply absent from
    the response and from the returned list.
    """
    ids = list(steam_ids)
    out: list[SteamProfile] = []
    for i in range(0, len(ids), MAX_IDS_PER_CALL):
        chunk = ids[i : i + MAX_IDS_PER_CALL]
        params = {"key": api_key, "steamids": ",".join(chunk)}
        resp = _session_get(GET_PLAYER_SUMMARIES_URL, params=params)
        resp.raise_for_status()
        payload = resp.json() or {}
        players = (payload.get("response") or {}).get("players") or []
        for p in players:
            out.append(
                SteamProfile(
                    steam_id_64=str(p["steamid"]),
                    persona_name=str(p.get("personaname", "")),
                    avatar_url=str(p.get("avatarmedium", "")),
                )
            )
    return out
  • Step 4: Run, verify pass

Run: pytest l4d2web/tests/test_steam_users.py -v Expected: PASS.

  • Step 5: Commit
git add l4d2web/services/steam_users.py l4d2web/tests/test_steam_users.py
git commit -m "feat(steam): add GetPlayerSummaries client"

Task 6: Live-state poller — RLE snapshot writer

Files:

  • Create: l4d2web/services/live_state_poller.py

  • Test: l4d2web/tests/test_live_state_poller.py

  • Step 1: Write the failing RLE snapshot test

Create l4d2web/tests/test_live_state_poller.py:

"""Live-state poller tests.

Each test seeds an app + DB, monkeypatches the RCON client to return a
canned StatusResponse, and asserts on what the poller writes.
"""
from __future__ import annotations

from datetime import datetime, timedelta, UTC

import pytest
from sqlalchemy import select

from l4d2web.app import create_app
from l4d2web.db import init_db, session_scope
from l4d2web.models import (
    Blueprint,
    Server,
    ServerLiveState,
    User,
)
from l4d2web.services import live_state_poller
from l4d2web.services.rcon import PlayerRow, StatusResponse


def _seed(tmp_path):
    db_url = f"sqlite:///{tmp_path/'p.db'}"
    import os
    os.environ["DATABASE_URL"] = db_url
    app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "x"})
    init_db()
    with session_scope() as db:
        u = User(username="u", password_digest="x"); db.add(u); db.flush()
        bp = Blueprint(user_id=u.id, name="bp", arguments="[]", config="[]"); db.add(bp); db.flush()
        s = Server(
            user_id=u.id, blueprint_id=bp.id, name="s", port=27500,
            rcon_password="pw", actual_state="running",
        )
        db.add(s); db.flush()
        return app, s.id


def _status(players: int, map_: str = "c1m1_hotel", hibernating: bool = False,
            roster: list[PlayerRow] | None = None) -> StatusResponse:
    return StatusResponse(
        map=map_, players=players, max_players=4, bots=0,
        hibernating=hibernating, roster=roster or [],
    )


def test_rle_bumps_last_seen_when_state_unchanged(tmp_path, monkeypatch) -> None:
    app, sid = _seed(tmp_path)
    monkeypatch.setattr(
        live_state_poller, "query_status",
        lambda host, port, password, timeout: _status(players=0),
    )

    with app.app_context():
        live_state_poller.poll_once()
        live_state_poller.poll_once()

    with session_scope() as db:
        rows = db.scalars(
            select(ServerLiveState).where(ServerLiveState.server_id == sid)
            .order_by(ServerLiveState.started_at)
        ).all()
    assert len(rows) == 1
    assert rows[0].players == 0
    assert rows[0].last_seen_at >= rows[0].started_at


def test_rle_inserts_new_row_on_state_change(tmp_path, monkeypatch) -> None:
    app, sid = _seed(tmp_path)
    snapshots = iter([_status(players=0), _status(players=1)])
    monkeypatch.setattr(
        live_state_poller, "query_status",
        lambda *a, **kw: next(snapshots),
    )

    with app.app_context():
        live_state_poller.poll_once()
        live_state_poller.poll_once()

    with session_scope() as db:
        rows = db.scalars(
            select(ServerLiveState).where(ServerLiveState.server_id == sid)
            .order_by(ServerLiveState.started_at)
        ).all()
    assert [r.players for r in rows] == [0, 1]


def test_skips_servers_without_rcon_password(tmp_path, monkeypatch) -> None:
    app, sid = _seed(tmp_path)
    with session_scope() as db:
        s = db.scalar(select(Server).where(Server.id == sid))
        s.rcon_password = ""

    called: list = []
    monkeypatch.setattr(
        live_state_poller, "query_status",
        lambda *a, **kw: called.append(1) or _status(0),
    )

    with app.app_context():
        live_state_poller.poll_once()

    assert called == []


def test_skips_non_running_servers(tmp_path, monkeypatch) -> None:
    app, sid = _seed(tmp_path)
    with session_scope() as db:
        s = db.scalar(select(Server).where(Server.id == sid))
        s.actual_state = "stopped"

    called: list = []
    monkeypatch.setattr(
        live_state_poller, "query_status",
        lambda *a, **kw: called.append(1) or _status(0),
    )

    with app.app_context():
        live_state_poller.poll_once()

    assert called == []
  • Step 2: Run, verify failures

Run: pytest l4d2web/tests/test_live_state_poller.py -v -k rle or skip Expected: FAIL — module missing.

  • Step 3: Implement the poller (snapshot-only for now)

Create l4d2web/services/live_state_poller.py:

"""Background poller that maintains live game-server state in the DB.

Modeled on l4d2web/services/job_worker.py:617-647. This module owns:
  - per-server snapshot writes with run-length encoding into
    `server_live_state`
  - player-session lifecycle in `server_player_session` (Task 7)
  - Steam profile enrichment into `steam_user_profile` (Task 8)
  - retention pruning and stuck-session closure (Task 10)

This file is built up across Tasks 6-10.
"""
from __future__ import annotations

import logging
import threading
import time
from datetime import datetime, UTC
from typing import Callable

from sqlalchemy import select

from l4d2web.db import session_scope
from l4d2web.models import (
    Server,
    ServerLiveState,
)
from l4d2web.services.rcon import RconError, StatusResponse, query_status


logger = logging.getLogger(__name__)


def _now() -> datetime:
    return datetime.now(UTC).replace(tzinfo=None)


def poll_once() -> None:
    """One pass over all running servers with a configured rcon_password."""
    with session_scope() as db:
        servers = db.scalars(
            select(Server)
            .where(Server.actual_state == "running")
            .where(Server.rcon_password != "")
        ).all()
        targets = [(s.id, s.port, s.rcon_password) for s in servers]

    for server_id, port, password in targets:
        try:
            status = query_status("127.0.0.1", port, password, timeout=2.0)
        except RconError:
            logger.warning("rcon query failed for server %d", server_id, exc_info=True)
            continue

        _record_snapshot(server_id, status)


def _record_snapshot(server_id: int, status: StatusResponse) -> None:
    """RLE write: bump last_seen_at if state matches, else insert a new row."""
    now = _now()
    with session_scope() as db:
        latest = db.scalars(
            select(ServerLiveState)
            .where(ServerLiveState.server_id == server_id)
            .order_by(ServerLiveState.started_at.desc())
            .limit(1)
        ).first()

        if latest is not None and _matches(latest, status):
            latest.last_seen_at = now
            return

        db.add(
            ServerLiveState(
                server_id=server_id,
                started_at=now,
                last_seen_at=now,
                players=status.players,
                max_players=status.max_players,
                bots=status.bots,
                map=status.map,
                hibernating=status.hibernating,
            )
        )


def _matches(row: ServerLiveState, status: StatusResponse) -> bool:
    return (
        row.players == status.players
        and row.max_players == status.max_players
        and row.bots == status.bots
        and row.map == status.map
        and row.hibernating == status.hibernating
    )
  • Step 4: Run, verify pass

Run: pytest l4d2web/tests/test_live_state_poller.py -v -k "rle or skip" Expected: PASS.

  • Step 5: Commit
git add l4d2web/services/live_state_poller.py l4d2web/tests/test_live_state_poller.py
git commit -m "feat(live-state): poller writes RLE snapshots to server_live_state"

Task 7: Live-state poller — session reconciliation

Files:

  • Modify: l4d2web/services/live_state_poller.py

  • Modify: l4d2web/tests/test_live_state_poller.py

  • Step 1: Write the failing session tests

Append to l4d2web/tests/test_live_state_poller.py:

from l4d2web.models import ServerPlayerSession


def _player(steam_id: str = "76561197960828710", name: str = "Alice",
            connected: int = 21, ping: int = 60) -> PlayerRow:
    return PlayerRow(
        steam_id_64=steam_id, name=name, connected_seconds=connected, ping=ping,
    )


def test_new_player_opens_session_with_backfilled_join(tmp_path, monkeypatch) -> None:
    app, sid = _seed(tmp_path)
    monkeypatch.setattr(
        live_state_poller, "query_status",
        lambda *a, **kw: _status(players=1, roster=[_player(connected=30)]),
    )

    with app.app_context():
        live_state_poller.poll_once()

    with session_scope() as db:
        sessions = db.scalars(
            select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid)
        ).all()
    assert len(sessions) == 1
    s = sessions[0]
    assert s.steam_id_64 == "76561197960828710"
    assert s.name_at_join == "Alice"
    assert s.left_at is None
    assert s.min_ping == 60
    assert s.max_ping == 60
    # joined_at should be ~30s before now
    delta = (datetime.now(UTC).replace(tzinfo=None) - s.joined_at).total_seconds()
    assert 25 <= delta <= 60


def test_session_closes_when_player_no_longer_in_roster(tmp_path, monkeypatch) -> None:
    app, sid = _seed(tmp_path)
    snapshots = iter([
        _status(players=1, roster=[_player()]),
        _status(players=0, roster=[]),
    ])
    monkeypatch.setattr(
        live_state_poller, "query_status",
        lambda *a, **kw: next(snapshots),
    )

    with app.app_context():
        live_state_poller.poll_once()
        live_state_poller.poll_once()

    with session_scope() as db:
        sessions = db.scalars(
            select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid)
        ).all()
    assert len(sessions) == 1
    assert sessions[0].left_at is not None


def test_session_ping_range_extends(tmp_path, monkeypatch) -> None:
    app, sid = _seed(tmp_path)
    snapshots = iter([
        _status(players=1, roster=[_player(ping=60)]),
        _status(players=1, roster=[_player(ping=200)]),
        _status(players=1, roster=[_player(ping=40)]),
    ])
    monkeypatch.setattr(
        live_state_poller, "query_status",
        lambda *a, **kw: next(snapshots),
    )

    with app.app_context():
        live_state_poller.poll_once()
        live_state_poller.poll_once()
        live_state_poller.poll_once()

    with session_scope() as db:
        s = db.scalar(select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid))
    assert s.min_ping == 40
    assert s.max_ping == 200


def test_session_skips_bots(tmp_path, monkeypatch) -> None:
    # Bots have non-STEAM uniqueid; our parser already drops them, but verify
    # that even if a non-STEAM steam_id_64 makes it through, sessions filter
    # it out. We exercise the filter directly by giving a string that doesn't
    # match the expected SteamID64 numeric form.
    app, sid = _seed(tmp_path)
    monkeypatch.setattr(
        live_state_poller, "query_status",
        lambda *a, **kw: _status(
            players=0, roster=[_player(steam_id="BOT", name="Coach")]
        ),
    )

    with app.app_context():
        live_state_poller.poll_once()

    with session_scope() as db:
        sessions = db.scalars(
            select(ServerPlayerSession).where(ServerPlayerSession.server_id == sid)
        ).all()
    assert sessions == []
  • Step 2: Run, verify failures

Run: pytest l4d2web/tests/test_live_state_poller.py -v -k session Expected: FAIL.

  • Step 3: Extend live_state_poller.py with session reconciliation

Update the imports at the top:

from datetime import datetime, timedelta, UTC

from sqlalchemy import and_, select

from l4d2web.models import (
    Server,
    ServerLiveState,
    ServerPlayerSession,
)
from l4d2web.services.rcon import PlayerRow, RconError, StatusResponse, query_status

Replace the poll_once body with:

def poll_once() -> None:
    """One pass over all running servers with a configured rcon_password."""
    with session_scope() as db:
        servers = db.scalars(
            select(Server)
            .where(Server.actual_state == "running")
            .where(Server.rcon_password != "")
        ).all()
        targets = [(s.id, s.port, s.rcon_password) for s in servers]

    for server_id, port, password in targets:
        try:
            status = query_status("127.0.0.1", port, password, timeout=2.0)
        except RconError:
            logger.warning("rcon query failed for server %d", server_id, exc_info=True)
            continue

        _record_snapshot(server_id, status)
        _reconcile_sessions(server_id, status)

Add the session reconciler:

_STEAM_ID_64_PREFIX = "7656"  # all SteamID64s start with this; bots/anon do not


def _is_valid_steam_id_64(value: str) -> bool:
    return value.startswith(_STEAM_ID_64_PREFIX) and value.isdigit() and len(value) >= 16


def _reconcile_sessions(server_id: int, status: StatusResponse) -> None:
    """Open new sessions, update ping ranges, close departed sessions."""
    now = _now()
    roster = [p for p in status.roster if _is_valid_steam_id_64(p.steam_id_64)]
    seen_ids = {p.steam_id_64 for p in roster}

    with session_scope() as db:
        open_rows = db.scalars(
            select(ServerPlayerSession).where(
                ServerPlayerSession.server_id == server_id,
                ServerPlayerSession.left_at.is_(None),
            )
        ).all()
        open_by_sid = {r.steam_id_64: r for r in open_rows}

        # Close sessions for players no longer in the roster.
        for sid, row in open_by_sid.items():
            if sid not in seen_ids:
                row.left_at = now

        # Open / update sessions for current roster.
        for p in roster:
            existing = open_by_sid.get(p.steam_id_64)
            if existing is None:
                db.add(
                    ServerPlayerSession(
                        server_id=server_id,
                        steam_id_64=p.steam_id_64,
                        joined_at=now - timedelta(seconds=p.connected_seconds),
                        left_at=None,
                        name_at_join=p.name,
                        min_ping=p.ping,
                        max_ping=p.ping,
                    )
                )
            else:
                if p.ping < existing.min_ping:
                    existing.min_ping = p.ping
                if p.ping > existing.max_ping:
                    existing.max_ping = p.ping
  • Step 4: Run, verify pass

Run: pytest l4d2web/tests/test_live_state_poller.py -v Expected: PASS (all earlier tests + new session tests).

  • Step 5: Commit
git add l4d2web/services/live_state_poller.py l4d2web/tests/test_live_state_poller.py
git commit -m "feat(live-state): reconcile player sessions on each poll"

Task 8: Live-state poller — Steam profile enrichment

Files:

  • Modify: l4d2web/services/live_state_poller.py

  • Modify: l4d2web/tests/test_live_state_poller.py

  • Step 1: Write the failing enrichment tests

Append to l4d2web/tests/test_live_state_poller.py:

from l4d2web.models import SteamUserProfile
from l4d2web.services import steam_users


def test_enriches_missing_profile(tmp_path, monkeypatch) -> None:
    app, sid = _seed(tmp_path)
    app.config["STEAM_WEB_API_KEY"] = "KEY"
    monkeypatch.setattr(
        live_state_poller, "query_status",
        lambda *a, **kw: _status(players=1, roster=[_player()]),
    )
    captured: list = []
    def fake_fetch(ids, *, api_key):
        captured.append((list(ids), api_key))
        return [steam_users.SteamProfile(
            steam_id_64="76561197960828710",
            persona_name="Alice",
            avatar_url="https://avatars.../alice_medium.jpg",
        )]
    monkeypatch.setattr(live_state_poller, "fetch_profiles_batch", fake_fetch)

    with app.app_context():
        live_state_poller.poll_once()

    assert captured and captured[0][1] == "KEY"
    with session_scope() as db:
        p = db.scalar(select(SteamUserProfile))
    assert p is not None
    assert p.persona_name == "Alice"


def test_skips_enrichment_when_api_key_unset(tmp_path, monkeypatch) -> None:
    app, sid = _seed(tmp_path)
    app.config["STEAM_WEB_API_KEY"] = ""
    monkeypatch.setattr(
        live_state_poller, "query_status",
        lambda *a, **kw: _status(players=1, roster=[_player()]),
    )
    monkeypatch.setattr(
        live_state_poller, "fetch_profiles_batch",
        lambda ids, *, api_key: pytest.fail("must not call without key"),
    )

    with app.app_context():
        live_state_poller.poll_once()


def test_skips_enrichment_when_cache_is_fresh(tmp_path, monkeypatch) -> None:
    app, sid = _seed(tmp_path)
    app.config["STEAM_WEB_API_KEY"] = "KEY"
    with session_scope() as db:
        db.add(SteamUserProfile(
            steam_id_64="76561197960828710",
            persona_name="cached",
            avatar_url="cached.jpg",
            fetched_at=datetime.now(UTC).replace(tzinfo=None),
        ))
    monkeypatch.setattr(
        live_state_poller, "query_status",
        lambda *a, **kw: _status(players=1, roster=[_player()]),
    )
    called: list = []
    monkeypatch.setattr(
        live_state_poller, "fetch_profiles_batch",
        lambda ids, *, api_key: called.append(list(ids)) or [],
    )

    with app.app_context():
        live_state_poller.poll_once()

    assert called == []
  • Step 2: Run, verify failures

Run: pytest l4d2web/tests/test_live_state_poller.py -v -k enrich or api_key or cache_is_fresh Expected: FAIL.

  • Step 3: Extend live_state_poller.py with enrichment

Update imports:

from flask import current_app

from l4d2web.models import (
    Server,
    ServerLiveState,
    ServerPlayerSession,
    SteamUserProfile,
)
from l4d2web.services.steam_users import SteamProfile, fetch_profiles_batch

Replace poll_once (now adding an enrichment phase at the end of each per-server iteration):

def poll_once() -> None:
    with session_scope() as db:
        servers = db.scalars(
            select(Server)
            .where(Server.actual_state == "running")
            .where(Server.rcon_password != "")
        ).all()
        targets = [(s.id, s.port, s.rcon_password) for s in servers]

    api_key = current_app.config.get("STEAM_WEB_API_KEY", "") or ""
    ttl_seconds = int(current_app.config.get("STEAM_PROFILE_TTL_SECONDS", 86400))

    for server_id, port, password in targets:
        try:
            status = query_status("127.0.0.1", port, password, timeout=2.0)
        except RconError:
            logger.warning("rcon query failed for server %d", server_id, exc_info=True)
            continue

        _record_snapshot(server_id, status)
        _reconcile_sessions(server_id, status)
        if api_key:
            _enrich_profiles(status, api_key=api_key, ttl_seconds=ttl_seconds)


def _enrich_profiles(status: StatusResponse, *, api_key: str, ttl_seconds: int) -> None:
    """Fetch+cache Steam profile data for any roster IDs missing or stale."""
    roster_ids = {p.steam_id_64 for p in status.roster if _is_valid_steam_id_64(p.steam_id_64)}
    if not roster_ids:
        return
    cutoff = _now() - timedelta(seconds=ttl_seconds)
    with session_scope() as db:
        fresh = set(db.scalars(
            select(SteamUserProfile.steam_id_64).where(
                SteamUserProfile.steam_id_64.in_(roster_ids),
                SteamUserProfile.fetched_at >= cutoff,
            )
        ).all())
    needs_fetch = sorted(roster_ids - fresh)
    if not needs_fetch:
        return
    try:
        profiles = fetch_profiles_batch(needs_fetch, api_key=api_key)
    except Exception:  # network / API errors are soft-fail
        logger.warning("steam profile enrichment failed", exc_info=True)
        return

    now = _now()
    with session_scope() as db:
        for p in profiles:
            row = db.get(SteamUserProfile, p.steam_id_64)
            if row is None:
                db.add(SteamUserProfile(
                    steam_id_64=p.steam_id_64,
                    persona_name=p.persona_name,
                    avatar_url=p.avatar_url,
                    fetched_at=now,
                ))
            else:
                row.persona_name = p.persona_name
                row.avatar_url = p.avatar_url
                row.fetched_at = now
  • Step 4: Run, verify pass

Run: pytest l4d2web/tests/test_live_state_poller.py -v Expected: PASS.

  • Step 5: Commit
git add l4d2web/services/live_state_poller.py l4d2web/tests/test_live_state_poller.py
git commit -m "feat(live-state): enrich roster with cached Steam profiles"

Task 9: Live-state poller — retention + stuck-session close + thread startup

Files:

  • Modify: l4d2web/services/live_state_poller.py

  • Modify: l4d2web/app.py

  • Modify: l4d2web/config.py

  • Modify: l4d2web/tests/test_live_state_poller.py

  • Step 1: Write the failing tests

Append to l4d2web/tests/test_live_state_poller.py:

def test_retention_trims_old_rows(tmp_path, monkeypatch) -> None:
    app, sid = _seed(tmp_path)
    app.config["LIVE_STATE_HISTORY_DAYS"] = 30
    long_ago = datetime.now(UTC).replace(tzinfo=None) - timedelta(days=45)
    with session_scope() as db:
        db.add(ServerLiveState(
            server_id=sid, started_at=long_ago, last_seen_at=long_ago,
            players=0, max_players=4, bots=0, map="old", hibernating=True,
        ))
        db.add(ServerPlayerSession(
            server_id=sid, steam_id_64="76561197960828710",
            joined_at=long_ago, left_at=long_ago,
            name_at_join="OldPlayer", min_ping=10, max_ping=10,
        ))

    with app.app_context():
        live_state_poller.prune_history()

    with session_scope() as db:
        snaps = db.scalars(select(ServerLiveState)).all()
        sess = db.scalars(select(ServerPlayerSession)).all()
    assert snaps == []
    assert sess == []


def test_close_stuck_sessions_after_threshold(tmp_path, monkeypatch) -> None:
    app, sid = _seed(tmp_path)
    app.config["STUCK_SESSION_SECONDS"] = 60
    way_back = datetime.now(UTC).replace(tzinfo=None) - timedelta(hours=2)
    with session_scope() as db:
        db.add(ServerPlayerSession(
            server_id=sid, steam_id_64="76561197960828710",
            joined_at=way_back, left_at=None,
            name_at_join="GhostPlayer", min_ping=10, max_ping=10,
        ))

    # Server is in `targets` but the RCON call fails — i.e., we haven't seen
    # this server respond in a long time. Poller must close stuck sessions.
    def boom(*a, **kw):
        from l4d2web.services.rcon import RconError
        raise RconError("simulated outage")
    monkeypatch.setattr(live_state_poller, "query_status", boom)

    with app.app_context():
        live_state_poller.poll_once()

    with session_scope() as db:
        row = db.scalar(select(ServerPlayerSession))
    assert row.left_at is not None


def test_start_live_state_poller_skipped_during_testing(monkeypatch, tmp_path) -> None:
    from l4d2web import app as app_module
    called: list = []
    monkeypatch.setattr(
        app_module, "start_live_state_poller", lambda app: called.append(app)
    )
    app_module.create_app({"TESTING": True, "DATABASE_URL": f"sqlite:///{tmp_path/'x.db'}", "SECRET_KEY": "k"})
    assert called == []


def test_start_live_state_poller_started_outside_testing(monkeypatch, tmp_path) -> None:
    from l4d2web import app as app_module
    called: list = []
    monkeypatch.setattr(
        app_module, "start_live_state_poller", lambda app: called.append(app)
    )
    app = app_module.create_app(
        {"TESTING": False, "DATABASE_URL": f"sqlite:///{tmp_path/'x.db'}", "SECRET_KEY": "k"}
    )
    assert called == [app]
  • Step 2: Run, verify failures

Run: pytest l4d2web/tests/test_live_state_poller.py -v -k retention or stuck or start_live_state_poller Expected: FAIL.

  • Step 3: Extend live_state_poller.py

Add to the module (after _enrich_profiles):

def prune_history() -> None:
    """Delete snapshots and closed sessions older than retention."""
    days = int(current_app.config.get("LIVE_STATE_HISTORY_DAYS", 30))
    cutoff = _now() - timedelta(days=days)
    with session_scope() as db:
        db.query(ServerLiveState).filter(
            ServerLiveState.last_seen_at < cutoff
        ).delete(synchronize_session=False)
        db.query(ServerPlayerSession).filter(
            ServerPlayerSession.left_at.is_not(None),
            ServerPlayerSession.left_at < cutoff,
        ).delete(synchronize_session=False)


def _close_stuck_sessions(server_id: int) -> None:
    """If a server has open sessions whose joined_at is older than threshold
    AND we've failed to observe them, close them as stuck."""
    seconds = int(current_app.config.get("STUCK_SESSION_SECONDS", 60))
    cutoff = _now() - timedelta(seconds=seconds)
    with session_scope() as db:
        rows = db.scalars(
            select(ServerPlayerSession).where(
                ServerPlayerSession.server_id == server_id,
                ServerPlayerSession.left_at.is_(None),
                ServerPlayerSession.joined_at < cutoff,
            )
        ).all()
        for row in rows:
            row.left_at = _now()

Update poll_once to call _close_stuck_sessions when RCON fails:

def poll_once() -> None:
    with session_scope() as db:
        servers = db.scalars(
            select(Server)
            .where(Server.actual_state == "running")
            .where(Server.rcon_password != "")
        ).all()
        targets = [(s.id, s.port, s.rcon_password) for s in servers]

    api_key = current_app.config.get("STEAM_WEB_API_KEY", "") or ""
    ttl_seconds = int(current_app.config.get("STEAM_PROFILE_TTL_SECONDS", 86400))
    timeout = float(current_app.config.get("LIVE_STATE_QUERY_TIMEOUT_SECONDS", 2.0))

    for server_id, port, password in targets:
        try:
            status = query_status("127.0.0.1", port, password, timeout=timeout)
        except RconError:
            logger.warning("rcon query failed for server %d", server_id, exc_info=True)
            _close_stuck_sessions(server_id)
            continue

        _record_snapshot(server_id, status)
        _reconcile_sessions(server_id, status)
        if api_key:
            _enrich_profiles(status, api_key=api_key, ttl_seconds=ttl_seconds)

Add thread-start helpers at the end:

_poller_started_lock = threading.Lock()
_poller_started = False


def start_live_state_poller(app) -> None:
    """Spawn the daemon poller thread once per process."""
    global _poller_started
    with _poller_started_lock:
        if _poller_started:
            return
        _poller_started = True
        interval = float(app.config.get("LIVE_STATE_POLL_SECONDS", 5))
        retention_every = max(1, int(app.config.get("LIVE_STATE_RETENTION_EVERY_TICKS", 60)))
        thread = threading.Thread(
            target=_poller_loop,
            args=(app, interval, retention_every),
            name="left4me-live-state-poller",
            daemon=True,
        )
        thread.start()


def _poller_loop(app, interval: float, retention_every: int) -> None:
    tick = 0
    while True:
        try:
            with app.app_context():
                poll_once()
                if tick % retention_every == 0:
                    prune_history()
        except Exception:
            logger.exception("live-state poller loop exception")
        tick += 1
        time.sleep(interval)
  • Step 4: Register config keys in l4d2web/config.py

Find the existing config defaults block (mirroring JOB_WORKER_POLL_SECONDS) and add:

"LIVE_STATE_POLL_SECONDS": 5,
"LIVE_STATE_QUERY_TIMEOUT_SECONDS": 2.0,
"LIVE_STATE_STALE_SECONDS": 30,
"LIVE_STATE_HISTORY_DAYS": 30,
"LIVE_STATE_RETENTION_EVERY_TICKS": 60,
"STUCK_SESSION_SECONDS": 60,
"STEAM_PROFILE_TTL_SECONDS": 86400,
"STEAM_WEB_API_KEY": "",

The env-var loading should follow the existing pattern; if STEAM_WEB_API_KEY is sourced from web.env, ensure the existing env-loader picks it up (it likely already does via a generic mechanism — verify before duplicating logic).

  • Step 5: Wire into l4d2web/app.py

Find the line that calls start_state_poller(app) (or start_job_workers(app)). Immediately below it, add:

from l4d2web.services.live_state_poller import start_live_state_poller
# ...
if not app.config.get("TESTING"):
    start_live_state_poller(app)

If start_state_poller already runs only outside TESTING, hook in the same way.

  • Step 6: Run, verify pass

Run: pytest l4d2web/tests/test_live_state_poller.py -v Expected: PASS.

Run: pytest l4d2web/tests -q Expected: PASS (no regressions).

  • Step 7: Commit
git add l4d2web/services/live_state_poller.py l4d2web/app.py l4d2web/config.py \
        l4d2web/tests/test_live_state_poller.py
git commit -m "feat(live-state): start daemon poller, prune history, close stuck sessions"

Task 10: Server-list live-state badge

Files:

  • Modify: l4d2web/routes/page_routes.py (or wherever /servers index renders — confirm)

  • Modify: l4d2web/templates/servers.html

  • Test: l4d2web/tests/test_server_routes.py (or test_page_routes.py)

  • Step 1: Locate the servers index handler

Run: grep -n "servers\.html\|/servers\b" l4d2web/routes/*.py Note the handler function and which route renders servers.html.

  • Step 2: Write the failing test

Append to the appropriate test file:

def test_servers_index_renders_live_state_badge(user_client) -> None:
    # Seed one server with a recent snapshot, one without.
    now = datetime.now(UTC).replace(tzinfo=None)
    with session_scope() as db:
        u = db.scalar(select(User).where(User.username == "user"))
        bp = db.scalar(select(Blueprint).where(Blueprint.user_id == u.id))
        s_active = Server(user_id=u.id, blueprint_id=bp.id, name="active", port=27700,
                          rcon_password="x", actual_state="running")
        s_stale = Server(user_id=u.id, blueprint_id=bp.id, name="stale", port=27701,
                         rcon_password="x", actual_state="running")
        db.add_all([s_active, s_stale]); db.flush()
        db.add(ServerLiveState(
            server_id=s_active.id, started_at=now, last_seen_at=now,
            players=2, max_players=4, bots=0, map="c1m2_streets", hibernating=False,
        ))
        old = now - timedelta(minutes=5)
        db.add(ServerLiveState(
            server_id=s_stale.id, started_at=old, last_seen_at=old,
            players=0, max_players=4, bots=0, map="c1m1_hotel", hibernating=True,
        ))

    res = user_client.get("/servers")
    html = res.get_data(as_text=True)
    assert "2/4" in html
    assert "c1m2_streets" in html
    # stale row should not render fresh counts; just a dim placeholder
    assert "c1m1_hotel" not in html or "stale" in html.lower()
  • Step 3: Run, verify failure

Run: pytest l4d2web/tests/test_server_routes.py -v -k live_state_badge Expected: FAIL.

  • Step 4: Update the servers index handler

In the route handler (found in Step 1), after loading the user's servers, also load the latest snapshot per server in one query, and pass a live_state_by_server dict into the template. Example sketch (adapt to the existing handler shape):

from datetime import datetime, timedelta, UTC
from sqlalchemy import select, func

from l4d2web.models import ServerLiveState

# ... inside the handler, after loading `servers`:
stale_seconds = current_app.config.get("LIVE_STATE_STALE_SECONDS", 30)
cutoff = datetime.now(UTC).replace(tzinfo=None) - timedelta(seconds=stale_seconds)

server_ids = [s.id for s in servers]
latest_rows: dict[int, ServerLiveState] = {}
if server_ids:
    # SQLite-friendly latest-per-server pattern.
    subq = (
        select(
            ServerLiveState.server_id,
            func.max(ServerLiveState.started_at).label("mx"),
        )
        .where(ServerLiveState.server_id.in_(server_ids))
        .group_by(ServerLiveState.server_id)
        .subquery()
    )
    rows = db.scalars(
        select(ServerLiveState).join(
            subq,
            (ServerLiveState.server_id == subq.c.server_id)
            & (ServerLiveState.started_at == subq.c.mx),
        )
    ).all()
    for r in rows:
        latest_rows[r.server_id] = r

live_state_by_server = {}
for sid, row in latest_rows.items():
    fresh = row.last_seen_at >= cutoff
    live_state_by_server[sid] = {
        "fresh": fresh,
        "players": row.players,
        "max_players": row.max_players,
        "map": row.map,
        "hibernating": row.hibernating,
    }

Pass live_state_by_server=live_state_by_server to render_template("servers.html", ...).

  • Step 5: Update l4d2web/templates/servers.html

Within the row markup for each server, add an inline live-state cell:

{% set ls = live_state_by_server.get(server.id) %}
<td class="server-live">
  {% if server.actual_state != 'running' %}
    <span class="muted">—</span>
  {% elif ls is none or not ls.fresh %}
    <span class="muted" title="no recent data">?</span>
  {% elif ls.hibernating %}
    {{ ls.players }}/{{ ls.max_players }} · idle · {{ ls.map }}
  {% else %}
    {{ ls.players }}/{{ ls.max_players }} · {{ ls.map }}
  {% endif %}
</td>

Place this within the existing per-server row, between or alongside the existing state/name cells — match the surrounding markup. Style class is just a hook for later CSS.

  • Step 6: Run, verify pass

Run: pytest l4d2web/tests/test_server_routes.py -v -k live_state_badge Expected: PASS.

Run: pytest l4d2web/tests -q Expected: PASS.

  • Step 7: Commit
git add l4d2web/routes/*.py l4d2web/templates/servers.html l4d2web/tests/test_server_routes.py
git commit -m "feat(servers): show live counts + map badge in server list"

Task 11: Server-detail live-state fragment + route

Files:

  • Modify: l4d2web/routes/server_routes.py

  • Create: l4d2web/templates/_live_state.html

  • Modify: l4d2web/templates/server_detail.html

  • Modify: response-headers / CSP setup file (locate during step 1)

  • Test: l4d2web/tests/test_server_routes.py

  • Step 1: Locate the CSP / response-headers code

Run: grep -rn "Content-Security-Policy\|img-src\|after_request" l4d2web/ If a CSP is set, find the img-src directive — we need to add the Steam avatar CDN hosts.

  • Step 2: Write the failing fragment route test

Append to l4d2web/tests/test_server_routes.py:

def test_live_state_fragment_renders_current_and_recent(user_client) -> None:
    now = datetime.now(UTC).replace(tzinfo=None)
    with session_scope() as db:
        u = db.scalar(select(User).where(User.username == "user"))
        bp = db.scalar(select(Blueprint).where(Blueprint.user_id == u.id))
        srv = Server(user_id=u.id, blueprint_id=bp.id, name="srv", port=27800,
                     rcon_password="x", actual_state="running")
        db.add(srv); db.flush()
        db.add(ServerLiveState(
            server_id=srv.id, started_at=now, last_seen_at=now,
            players=1, max_players=4, bots=0, map="c1m2_streets", hibernating=False,
        ))
        db.add(ServerPlayerSession(
            server_id=srv.id, steam_id_64="76561197960828710",
            joined_at=now - timedelta(minutes=5), left_at=None,
            name_at_join="Crone", min_ping=40, max_ping=60,
        ))
        db.add(ServerPlayerSession(
            server_id=srv.id, steam_id_64="76561198021234567",
            joined_at=now - timedelta(hours=2), left_at=now - timedelta(hours=1),
            name_at_join="OldPlayer", min_ping=20, max_ping=80,
        ))
        db.add(SteamUserProfile(
            steam_id_64="76561197960828710",
            persona_name="MrCool42",
            avatar_url="https://avatars.cloudflare.steamstatic.com/cur_medium.jpg",
            fetched_at=now,
        ))
        db.add(SteamUserProfile(
            steam_id_64="76561198021234567",
            persona_name="OldPersona",
            avatar_url="https://avatars.cloudflare.steamstatic.com/old_medium.jpg",
            fetched_at=now,
        ))
        srv_id = srv.id

    res = user_client.get(f"/servers/{srv_id}/live-state")
    assert res.status_code == 200
    html = res.get_data(as_text=True)
    # Summary
    assert "1/4" in html
    assert "c1m2_streets" in html
    # Current player block
    assert "MrCool42" in html
    assert "cur_medium.jpg" in html
    assert "40-60" in html or "4060" in html
    # Recent block — only OldPlayer, not MrCool42
    assert "OldPersona" in html
    assert "old_medium.jpg" in html
  • Step 3: Run, verify failure

Run: pytest l4d2web/tests/test_server_routes.py -v -k live_state_fragment Expected: FAIL — route 404.

  • Step 4: Add the fragment route in l4d2web/routes/server_routes.py
from datetime import datetime, timedelta, UTC

from flask import current_app, render_template
from sqlalchemy import select, func

from l4d2web.models import ServerLiveState, ServerPlayerSession, SteamUserProfile


@bp.get("/servers/<int:server_id>/live-state")
@require_login
def live_state_fragment(server_id: int) -> Response:
    user = current_user()
    assert user is not None
    with session_scope() as db:
        server = db.scalar(select(Server).where(
            Server.id == server_id, Server.user_id == user.id,
        ))
        if server is None:
            return Response(status=404)

        stale_seconds = current_app.config.get("LIVE_STATE_STALE_SECONDS", 30)
        cutoff = datetime.now(UTC).replace(tzinfo=None) - timedelta(seconds=stale_seconds)

        latest = db.scalar(
            select(ServerLiveState)
            .where(ServerLiveState.server_id == server.id)
            .order_by(ServerLiveState.started_at.desc())
            .limit(1)
        )

        current_rows = db.execute(
            select(ServerPlayerSession, SteamUserProfile)
            .outerjoin(
                SteamUserProfile,
                SteamUserProfile.steam_id_64 == ServerPlayerSession.steam_id_64,
            )
            .where(
                ServerPlayerSession.server_id == server.id,
                ServerPlayerSession.left_at.is_(None),
            )
            .order_by(ServerPlayerSession.joined_at)
        ).all()

        current_ids = [r[0].steam_id_64 for r in current_rows]

        recent_cutoff = datetime.now(UTC).replace(tzinfo=None) - timedelta(
            days=current_app.config.get("LIVE_STATE_HISTORY_DAYS", 30)
        )

        recent_rows = db.execute(
            select(
                ServerPlayerSession.steam_id_64,
                func.max(ServerPlayerSession.left_at).label("last_seen"),
                ServerPlayerSession.name_at_join,
                SteamUserProfile.persona_name,
                SteamUserProfile.avatar_url,
            )
            .outerjoin(
                SteamUserProfile,
                SteamUserProfile.steam_id_64 == ServerPlayerSession.steam_id_64,
            )
            .where(
                ServerPlayerSession.server_id == server.id,
                ServerPlayerSession.left_at.is_not(None),
                ServerPlayerSession.left_at >= recent_cutoff,
                ~ServerPlayerSession.steam_id_64.in_(current_ids) if current_ids else True,
            )
            .group_by(
                ServerPlayerSession.steam_id_64,
                SteamUserProfile.persona_name,
                SteamUserProfile.avatar_url,
                ServerPlayerSession.name_at_join,
            )
            .order_by(func.max(ServerPlayerSession.left_at).desc())
            .limit(20)
        ).all()

    return render_template(
        "_live_state.html",
        server=server,
        snapshot=latest,
        snapshot_fresh=(latest is not None and latest.last_seen_at >= cutoff),
        current_players=current_rows,
        recent_players=recent_rows,
        now=datetime.now(UTC).replace(tzinfo=None),
        poll_seconds=current_app.config.get("LIVE_STATE_POLL_SECONDS", 5),
    )
  • Step 5: Create l4d2web/templates/_live_state.html
<section class="panel live-state"
         hx-get="/servers/{{ server.id }}/live-state"
         hx-trigger="every {{ poll_seconds }}s"
         hx-swap="outerHTML">
  <h2 class="section-title">Live state</h2>
  {% if not snapshot or not snapshot_fresh %}
    <p class="muted">No data — server is not currently reporting.</p>
  {% else %}
    <p class="server-live-summary">
      {{ snapshot.players }}/{{ snapshot.max_players }}
      {% if snapshot.hibernating %}· idle{% endif %}
      · {{ snapshot.map }}
      <small class="muted">
        polled {{ ((now - snapshot.last_seen_at).total_seconds() | int) }}s ago
      </small>
    </p>
  {% endif %}

  {% if current_players %}
    <h3 class="section-subtitle">Current players</h3>
    <ul class="player-grid">
      {% for session, profile in current_players %}
        <li class="player-card">
          {% if profile and profile.avatar_url %}
            <img class="avatar" src="{{ profile.avatar_url }}" alt="" loading="lazy">
          {% else %}
            <span class="avatar placeholder" aria-hidden="true"></span>
          {% endif %}
          <span class="name">{{ (profile and profile.persona_name) or session.name_at_join }}</span>
          <span class="meta">
            joined {{ ((now - session.joined_at).total_seconds() // 60) | int }}m ago
            · ping {{ session.min_ping }}-{{ session.max_ping }}ms
          </span>
        </li>
      {% endfor %}
    </ul>
  {% endif %}

  {% if recent_players %}
    <h3 class="section-subtitle">Recent players</h3>
    <ul class="player-grid recent">
      {% for row in recent_players %}
        <li class="player-card">
          {% if row.avatar_url %}
            <img class="avatar" src="{{ row.avatar_url }}" alt="" loading="lazy">
          {% else %}
            <span class="avatar placeholder" aria-hidden="true"></span>
          {% endif %}
          <span class="name">{{ row.persona_name or row.name_at_join }}</span>
          <span class="meta">
            last seen {{ ((now - row.last_seen).total_seconds() // 60) | int }}m ago
          </span>
        </li>
      {% endfor %}
    </ul>
  {% endif %}
</section>
  • Step 6: Include the partial in l4d2web/templates/server_detail.html

Below the existing actions/log sections, add:

{% include "_live_state.html" %}

The route always returns the section with its HTMX wrapper, so the include and the fragment endpoint produce identical markup.

  • Step 7: Extend the CSP

Find the existing CSP / response-headers code (located in Step 1). Add to the img-src directive:

'self'
https://avatars.cloudflare.steamstatic.com
https://avatars.akamai.steamstatic.com
https://avatars.steamstatic.com
https://steamuserimages-a.akamaihd.net

If no CSP exists, skip this step and note it in the PR description so a follow-up can address it.

  • Step 8: Run, verify pass

Run: pytest l4d2web/tests/test_server_routes.py -v -k live_state_fragment Expected: PASS.

Run: pytest l4d2web/tests -q Expected: PASS.

  • Step 9: Commit
git add l4d2web/routes/server_routes.py l4d2web/templates/_live_state.html \
        l4d2web/templates/server_detail.html l4d2web/tests/test_server_routes.py
# include the CSP file if you edited one
git commit -m "feat(servers): add live-state panel with current and recent players"

Task 12: Deploy template — Steam API key in web.env

Files:

  • Modify: deploy/templates/etc/left4me/web.env

  • Step 1: Add the new var to the env template

Open deploy/templates/etc/left4me/web.env. Append:

# Steam Web API key for ISteamUser/GetPlayerSummaries — used to resolve
# player Steam IDs to persona names + avatars in the server live-state
# panel. Free at https://steamcommunity.com/dev/apikey. Optional: if
# empty, the live-state panel still shows counts/map and the in-game
# name from RCON, just with placeholder avatars.
STEAM_WEB_API_KEY=

If web.env is templated via envsubst or similar at deploy time, also add STEAM_WEB_API_KEY to the build-side variable list.

  • Step 2: Commit
git add deploy/templates/etc/left4me/web.env
git commit -m "deploy: add STEAM_WEB_API_KEY to web.env template"

Task 13: Smoke test against prod (manual)

No code changes. Verification only.

  • Step 1: Full suite + lint

Run: pytest l4d2web/tests -q Expected: PASS.

If ruff or similar is wired into CI, run it too.

  • Step 2: Deploy to left4.me

Follow the standard deploy path (deploy/deploy-test-server.sh left4.me or whatever the project uses). After deploy:

ssh left4.me 'systemctl status left4me-web.service'
ssh left4.me 'journalctl -u left4me-web.service -n 50 --no-pager'

Expected: service is active (running), no crash-loop, journal mentions the live-state poller starting.

  • Step 3: Confirm migration backfilled passwords
ssh left4.me 'sudo -u left4me sqlite3 /var/lib/left4me/l4d2web.db \
  "SELECT id, name, length(rcon_password) FROM servers"'

Expected: every row has length >= 32.

  • Step 4: Re-initialize and restart each existing server

In the web UI, click initialize then start for each existing server. (Or the equivalent from l4d2ctl.)

Confirm rcon_password "..." is the last non-blank line in each server.cfg:

ssh left4.me 'sudo cat /opt/l4d2/instances/<name>/server.cfg | tail -5'
  • Step 5: Verify poller is writing

After ~10s:

ssh left4.me 'sudo -u left4me sqlite3 /var/lib/left4me/l4d2web.db \
  "SELECT server_id, started_at, last_seen_at, players, map, hibernating \
   FROM server_live_state ORDER BY server_id, started_at DESC LIMIT 6"'

Expected: one fresh row per server. last_seen_at is recent.

  • Step 6: Join a server from the L4D2 client

Connect to one of the prod servers. Within 5-10s:

  • /servers shows the badge change to 1/4 · <map>.
  • /servers/<id> shows your avatar + persona name + ping range in the "Current players" block.

Disconnect. Within 5-10s:

  • Badge returns to 0/4 · idle · <map>.

  • Detail page card moves from "Current" to "Recent players."

  • Step 7: Verify sessions table state

ssh left4.me 'sudo -u left4me sqlite3 /var/lib/left4me/l4d2web.db \
  "SELECT steam_id_64, name_at_join, joined_at, left_at, min_ping, max_ping \
   FROM server_player_session ORDER BY joined_at DESC LIMIT 5"'

Expected: one row per connection. left_at is non-null after disconnect; min_ping <= max_ping.

  • Step 8: Failure-path checks

a. Set STEAM_WEB_API_KEY= (empty) in /etc/left4me/web.env, restart service. Confirm the UI still renders (in-game names, placeholder avatars) and the journal has no stack traces.

b. Edit one server's rcon_password in the DB to garbage. Confirm the journal logs "rcon query failed" and the badge goes stale; other servers are unaffected.

c. Wait for the stuck-session threshold to pass. Confirm any open sessions for the broken server get closed.


Self-Review

Spec coverage: Schema (3 tables + 1 column) → Task 1. RCON client → Task 2. Spec injection → Task 3. Password generation → Task 4. Steam API → Task 5. Poller (RLE / sessions / enrichment / retention / startup) → Tasks 6-9. UI (list / detail) → Tasks 10-11. Deploy env → Task 12. Verification → Task 13.

Type consistency: StatusResponse, PlayerRow, SteamProfile, ServerLiveState, ServerPlayerSession, SteamUserProfile are referenced identically across tasks. query_status, fetch_profiles_batch, start_live_state_poller, poll_once, prune_history keep their signatures throughout. Config keys (LIVE_STATE_POLL_SECONDS, STEAM_PROFILE_TTL_SECONDS, STUCK_SESSION_SECONDS, STEAM_WEB_API_KEY, LIVE_STATE_HISTORY_DAYS, LIVE_STATE_QUERY_TIMEOUT_SECONDS, LIVE_STATE_STALE_SECONDS, LIVE_STATE_RETENTION_EVERY_TICKS) are referenced consistently.

Open items deferred to implementation time (called out explicitly in the relevant tasks rather than left as TBDs):

  • Exact CSP wiring location (Task 11 Step 1)
  • Whether l4d2web/config.py auto-loads env vars or needs explicit registration (Task 9 Step 4)
  • Existing test fixtures (user_client, default blueprint seed) — reuse if present (Tasks 4, 10, 11)
  • Servers-index handler location (Task 10 Step 1)
  • Migration test layout — extend existing test_migrations.py if it exists, otherwise create (Task 1 Step 5)

No placeholders or TODOs in test code or implementation.