left4me/l4d2web/services/rcon.py
mwiegand 83d2a9932c
refactor(rcon): harden _parse_duration; surface fixture handler errors
- _parse_duration wraps int() in try/except so malformed connected
  durations raise RconError (not ValueError leaking past the poller's
  except RconError).
- fake_rcon_server captures handler exceptions and re-raises at context
  exit, so a buggy test handler surfaces as a real failure instead of
  silently degrading into a client-side timeout.
- Two new parser tests: HH:MM:SS duration parsing and malformed input
  coverage.
- Fix Steam ID formula typo in the spec doc (Z*2 + Y, not Y*2 + Z; Y is
  the low bit). Code was already correct.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 21:39:32 +02:00

179 lines
5.4 KiB
Python

"""Source RCON client + status parser.
Pure stdlib. One TCP connection per query — fine at our scale (loopback
~10-20ms round-trip; pooling not worth the complexity).
Source RCON wire format:
size : little-endian int32 (count of the bytes that follow)
req_id: little-endian int32
ptype : little-endian int32
body : utf-8 string, null-terminated
pad : one extra null byte
Packet types:
SERVERDATA_AUTH = 3 (client -> server)
SERVERDATA_EXECCOMMAND = 2 (client -> server)
SERVERDATA_AUTH_RESPONSE= 2 (server -> client)
SERVERDATA_RESPONSE_VALUE = 0 (server -> client)
After auth, the server sends a type=0 empty packet *first* and then the
type=2 auth response. req_id == -1 on the auth response = bad password.
"""
from __future__ import annotations
import re
import socket
import struct
from dataclasses import dataclass
SERVERDATA_AUTH = 3
SERVERDATA_EXECCOMMAND = 2
SERVERDATA_AUTH_RESPONSE = 2
SERVERDATA_RESPONSE_VALUE = 0
_STEAM_ID_BASE = 76561197960265728
class RconError(Exception):
"""Network, timeout, or protocol error."""
class RconAuthError(RconError):
"""The server rejected the password."""
@dataclass(slots=True, frozen=True)
class PlayerRow:
steam_id_64: str
name: str
connected_seconds: int
ping: int
@dataclass(slots=True, frozen=True)
class StatusResponse:
map: str
players: int
max_players: int
bots: int
hibernating: bool
roster: list[PlayerRow]
def query_status(
host: str, port: int, password: str, *, timeout: float = 2.0
) -> StatusResponse:
"""Connect to the RCON port, authenticate, send `status`, return parsed result."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
try:
sock.connect((host, port))
except OSError as exc:
raise RconError(f"connect failed: {exc}") from exc
try:
_send_packet(sock, 1, SERVERDATA_AUTH, password)
# Drain the leading empty type-0 packet; then read the real auth response.
r1 = _recv_packet(sock)
r2 = _recv_packet(sock)
auth = r2 if r1[1] == SERVERDATA_RESPONSE_VALUE else r1
if auth[0] == -1:
raise RconAuthError("bad rcon password")
_send_packet(sock, 2, SERVERDATA_EXECCOMMAND, "status")
_, _, body = _recv_packet(sock)
except (OSError, socket.timeout) as exc:
raise RconError(f"rcon i/o error: {exc}") from exc
finally:
sock.close()
return parse_status(body)
def _send_packet(sock: socket.socket, req_id: int, ptype: int, body: str) -> None:
body_bytes = body.encode("utf-8") + b"\x00\x00"
size = 4 + 4 + len(body_bytes)
sock.sendall(struct.pack("<iii", size, req_id, ptype) + body_bytes)
def _recv_packet(sock: socket.socket) -> tuple[int, int, str]:
size = struct.unpack("<i", _recvall(sock, 4))[0]
payload = _recvall(sock, size)
req_id, ptype = struct.unpack("<ii", payload[:8])
body = payload[8:].rstrip(b"\x00").decode("utf-8", errors="replace")
return req_id, ptype, body
def _recvall(sock: socket.socket, n: int) -> bytes:
data = b""
while len(data) < n:
chunk = sock.recv(n - len(data))
if not chunk:
raise RconError("rcon connection closed")
data += chunk
return data
# --- Status parsing -------------------------------------------------------
_MAP_RE = re.compile(r"^map\s*:\s*(\S+)", re.MULTILINE)
_PLAYERS_RE = re.compile(
r"^players\s*:\s*(\d+)\s+humans,\s*(\d+)\s+bots\s*\((\d+)\s+max\)"
r"\s*\((not hibernating|hibernating)\)",
re.MULTILINE,
)
# A status player row: starts with `#`, then variable numeric prefixes,
# then a quoted name, then STEAM_X:Y:Z, then connected time, then ping.
_PLAYER_RE = re.compile(
r'^#\s+(?:\d+\s+)+"(?P<name>[^"]*)"\s+'
r"(?P<sid>STEAM_\d+:(?P<y>\d+):(?P<z>\d+))\s+"
r"(?P<connected>[\d:]+)\s+"
r"(?P<ping>\d+)\s+",
re.MULTILINE,
)
def parse_status(body: str) -> StatusResponse:
map_match = _MAP_RE.search(body)
if not map_match:
raise RconError(f"status: no map line in response\n{body!r}")
players_match = _PLAYERS_RE.search(body)
if not players_match:
raise RconError(f"status: no players line\n{body!r}")
roster: list[PlayerRow] = []
for m in _PLAYER_RE.finditer(body):
y = int(m.group("y"))
z = int(m.group("z"))
roster.append(
PlayerRow(
steam_id_64=str(_STEAM_ID_BASE + (z * 2) + y),
name=m.group("name"),
connected_seconds=_parse_duration(m.group("connected")),
ping=int(m.group("ping")),
)
)
return StatusResponse(
map=map_match.group(1),
players=int(players_match.group(1)),
bots=int(players_match.group(2)),
max_players=int(players_match.group(3)),
hibernating=(players_match.group(4) == "hibernating"),
roster=roster,
)
def _parse_duration(text: str) -> int:
"""Parse Source's connected duration: HH:MM:SS or MM:SS -> seconds."""
try:
parts = [int(p) for p in text.split(":")]
except ValueError as exc:
raise RconError(f"unparseable connected duration: {text!r}") from exc
if len(parts) == 2:
return parts[0] * 60 + parts[1]
if len(parts) == 3:
return parts[0] * 3600 + parts[1] * 60 + parts[2]
raise RconError(f"unparseable connected duration: {text!r}")