"""Source RCON client + status parser. Pure stdlib. One TCP connection per query — fine at our scale (loopback ~10-20ms round-trip; pooling not worth the complexity). Source RCON wire format: size : little-endian int32 (count of the bytes that follow) req_id: little-endian int32 ptype : little-endian int32 body : utf-8 string, null-terminated pad : one extra null byte Packet types: SERVERDATA_AUTH = 3 (client -> server) SERVERDATA_EXECCOMMAND = 2 (client -> server) SERVERDATA_AUTH_RESPONSE= 2 (server -> client) SERVERDATA_RESPONSE_VALUE = 0 (server -> client) After auth, the server sends a type=0 empty packet *first* and then the type=2 auth response. req_id == -1 on the auth response = bad password. """ from __future__ import annotations import re import socket import struct from dataclasses import dataclass SERVERDATA_AUTH = 3 SERVERDATA_EXECCOMMAND = 2 SERVERDATA_AUTH_RESPONSE = 2 SERVERDATA_RESPONSE_VALUE = 0 # req_id values used by execute_command _EXEC_REQ_ID = 2 _MARKER_REQ_ID = 0xC0DE _STEAM_ID_BASE = 76561197960265728 class RconError(Exception): """Network, timeout, or protocol error.""" class RconAuthError(RconError): """The server rejected the password.""" @dataclass(slots=True, frozen=True) class PlayerRow: steam_id_64: str name: str connected_seconds: int ping: int @dataclass(slots=True, frozen=True) class StatusResponse: map: str players: int max_players: int bots: int hibernating: bool roster: list[PlayerRow] def _connect_and_auth( sock: socket.socket, host: str, port: int, password: str ) -> None: """Open TCP connection and authenticate. Raises: * RconError – on connect failure or i/o error during handshake * RconAuthError – when the server returns req_id == -1 (bad password) """ try: sock.connect((host, port)) except OSError as exc: raise RconError(f"connect failed: {exc}") from exc _send_packet(sock, 1, SERVERDATA_AUTH, password) # The server always sends a leading empty type-0 packet before the real # AUTH_RESPONSE (type-2). Drain whichever arrives first. r1 = _recv_packet(sock) r2 = _recv_packet(sock) auth = r2 if r1[1] == SERVERDATA_RESPONSE_VALUE else r1 if auth[0] == -1: raise RconAuthError("bad rcon password") def query_status( host: str, port: int, password: str, *, timeout: float = 2.0 ) -> StatusResponse: """Connect to the RCON port, authenticate, send `status`, return parsed result.""" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) try: try: _connect_and_auth(sock, host, port, password) _send_packet(sock, 2, SERVERDATA_EXECCOMMAND, "status") _, _, body = _recv_packet(sock) except (OSError, socket.timeout) as exc: raise RconError(f"rcon i/o error: {exc}") from exc finally: sock.close() return parse_status(body) def execute_command( host: str, port: int, password: str, command: str, *, timeout: float = 30.0 ) -> str: """Authenticate, send a single command, return the joined reply body. Uses the trailing-marker pattern: after the exec packet, send an empty SERVERDATA_RESPONSE_VALUE packet with a sentinel req_id. Read response packets, accumulating bodies, until we see one whose req_id matches the sentinel — that guarantees the real reply is complete, because RCON processes requests in receive order. Multi-packet replies (>4096 B, like `cvarlist`) are reassembled this way. """ if not command or not command.strip(): raise ValueError("command must not be empty or whitespace-only") if "\x00" in command: raise ValueError("command must not contain null bytes") if len(command.encode("utf-8")) > 1000: raise ValueError("command exceeds maximum byte length of 1000") sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) try: try: _connect_and_auth(sock, host, port, password) _send_packet(sock, _EXEC_REQ_ID, SERVERDATA_EXECCOMMAND, command) # Trailing marker: srcds processes requests in order, so its echo # of this empty packet arrives after all real command-output packets. _send_packet(sock, _MARKER_REQ_ID, SERVERDATA_RESPONSE_VALUE, "") chunks: list[str] = [] while True: req_id, _, body = _recv_packet(sock) if req_id == _MARKER_REQ_ID: break chunks.append(body) except (OSError, socket.timeout) as exc: raise RconError(f"rcon i/o error: {exc}") from exc finally: sock.close() return "".join(chunks).rstrip() def _send_packet(sock: socket.socket, req_id: int, ptype: int, body: str) -> None: body_bytes = body.encode("utf-8") + b"\x00\x00" size = 4 + 4 + len(body_bytes) sock.sendall(struct.pack(" tuple[int, int, str]: size = struct.unpack(" bytes: data = b"" while len(data) < n: chunk = sock.recv(n - len(data)) if not chunk: raise RconError("rcon connection closed") data += chunk return data # --- Status parsing ------------------------------------------------------- _MAP_RE = re.compile(r"^map\s*:\s*(\S+)", re.MULTILINE) _PLAYERS_RE = re.compile( r"^players\s*:\s*(\d+)\s+humans,\s*(\d+)\s+bots\s*\((\d+)\s+max\)" r"\s*\((not hibernating|hibernating)\)", re.MULTILINE, ) # A status player row: starts with `#`, then variable numeric prefixes, # then a quoted name, then STEAM_X:Y:Z, then connected time, then ping. _PLAYER_RE = re.compile( r'^#\s+(?:\d+\s+)+"(?P[^"]*)"\s+' r"(?PSTEAM_\d+:(?P\d+):(?P\d+))\s+" r"(?P[\d:]+)\s+" r"(?P\d+)\s+", re.MULTILINE, ) def parse_status(body: str) -> StatusResponse: map_match = _MAP_RE.search(body) if not map_match: raise RconError(f"status: no map line in response\n{body!r}") players_match = _PLAYERS_RE.search(body) if not players_match: raise RconError(f"status: no players line\n{body!r}") roster: list[PlayerRow] = [] for m in _PLAYER_RE.finditer(body): y = int(m.group("y")) z = int(m.group("z")) roster.append( PlayerRow( steam_id_64=str(_STEAM_ID_BASE + (z * 2) + y), name=m.group("name"), connected_seconds=_parse_duration(m.group("connected")), ping=int(m.group("ping")), ) ) return StatusResponse( map=map_match.group(1), players=int(players_match.group(1)), bots=int(players_match.group(2)), max_players=int(players_match.group(3)), hibernating=(players_match.group(4) == "hibernating"), roster=roster, ) def _parse_duration(text: str) -> int: """Parse Source's connected duration: HH:MM:SS or MM:SS -> seconds.""" try: parts = [int(p) for p in text.split(":")] except ValueError as exc: raise RconError(f"unparseable connected duration: {text!r}") from exc if len(parts) == 2: return parts[0] * 60 + parts[1] if len(parts) == 3: return parts[0] * 3600 + parts[1] * 60 + parts[2] raise RconError(f"unparseable connected duration: {text!r}")