left4me/l4d2web/services/rcon.py
mwiegand 9ef9ffdbde
chore(l4d2-web): clarify rcon req_id constants and helper docstring
Add comment noting _EXEC_REQ_ID/_MARKER_REQ_ID are arbitrary client-chosen
values unrelated to SERVERDATA_* packet-type constants. Update _connect_and_auth
docstring to accurately reflect that OSError/socket.timeout propagate raw from
post-connect send/recv, while only connect failure is wrapped in RconError.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 21:24:41 +02:00

243 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Source RCON client + status parser.
Pure stdlib. One TCP connection per query — fine at our scale (loopback
~10-20ms round-trip; pooling not worth the complexity).
Source RCON wire format:
size : little-endian int32 (count of the bytes that follow)
req_id: little-endian int32
ptype : little-endian int32
body : utf-8 string, null-terminated
pad : one extra null byte
Packet types:
SERVERDATA_AUTH = 3 (client -> server)
SERVERDATA_EXECCOMMAND = 2 (client -> server)
SERVERDATA_AUTH_RESPONSE= 2 (server -> client)
SERVERDATA_RESPONSE_VALUE = 0 (server -> client)
After auth, the server sends a type=0 empty packet *first* and then the
type=2 auth response. req_id == -1 on the auth response = bad password.
"""
from __future__ import annotations
import re
import socket
import struct
from dataclasses import dataclass
SERVERDATA_AUTH = 3
SERVERDATA_EXECCOMMAND = 2
SERVERDATA_AUTH_RESPONSE = 2
SERVERDATA_RESPONSE_VALUE = 0
# req_id values for execute_command's exec + marker packets.
# These are arbitrary positive ints chosen by the client; the values
# happen to be unrelated to the SERVERDATA_* packet-type constants.
_EXEC_REQ_ID = 2
_MARKER_REQ_ID = 0xC0DE
_STEAM_ID_BASE = 76561197960265728
class RconError(Exception):
"""Network, timeout, or protocol error."""
class RconAuthError(RconError):
"""The server rejected the password."""
@dataclass(slots=True, frozen=True)
class PlayerRow:
steam_id_64: str
name: str
connected_seconds: int
ping: int
@dataclass(slots=True, frozen=True)
class StatusResponse:
map: str
players: int
max_players: int
bots: int
hibernating: bool
roster: list[PlayerRow]
def _connect_and_auth(
sock: socket.socket, host: str, port: int, password: str
) -> None:
"""Open TCP connection and authenticate.
Raises:
RconError on connect failure
RconAuthError when the server returns req_id == -1 (bad password)
OSError / socket.timeout raw, from the post-connect send/recv;
callers wrap these into RconError.
"""
try:
sock.connect((host, port))
except OSError as exc:
raise RconError(f"connect failed: {exc}") from exc
_send_packet(sock, 1, SERVERDATA_AUTH, password)
# The server always sends a leading empty type-0 packet before the real
# AUTH_RESPONSE (type-2). Drain whichever arrives first.
r1 = _recv_packet(sock)
r2 = _recv_packet(sock)
auth = r2 if r1[1] == SERVERDATA_RESPONSE_VALUE else r1
if auth[0] == -1:
raise RconAuthError("bad rcon password")
def query_status(
host: str, port: int, password: str, *, timeout: float = 2.0
) -> StatusResponse:
"""Connect to the RCON port, authenticate, send `status`, return parsed result."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
try:
_connect_and_auth(sock, host, port, password)
_send_packet(sock, 2, SERVERDATA_EXECCOMMAND, "status")
_, _, body = _recv_packet(sock)
except (OSError, socket.timeout) as exc:
raise RconError(f"rcon i/o error: {exc}") from exc
finally:
sock.close()
return parse_status(body)
def execute_command(
host: str, port: int, password: str, command: str, *, timeout: float = 30.0
) -> str:
"""Authenticate, send a single command, return the joined reply body.
Uses the trailing-marker pattern: after the exec packet, send an empty
SERVERDATA_RESPONSE_VALUE packet with a sentinel req_id. Read response
packets, accumulating bodies, until we see one whose req_id matches the
sentinel — that guarantees the real reply is complete, because RCON
processes requests in receive order. Multi-packet replies (>4096 B, like
`cvarlist`) are reassembled this way.
"""
if not command or not command.strip():
raise ValueError("command must not be empty or whitespace-only")
if "\x00" in command:
raise ValueError("command must not contain null bytes")
if len(command.encode("utf-8")) > 1000:
raise ValueError("command exceeds maximum byte length of 1000")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
try:
_connect_and_auth(sock, host, port, password)
_send_packet(sock, _EXEC_REQ_ID, SERVERDATA_EXECCOMMAND, command)
# Trailing marker: srcds processes requests in order, so its echo
# of this empty packet arrives after all real command-output packets.
_send_packet(sock, _MARKER_REQ_ID, SERVERDATA_RESPONSE_VALUE, "")
chunks: list[str] = []
while True:
req_id, _, body = _recv_packet(sock)
if req_id == _MARKER_REQ_ID:
break
chunks.append(body)
except (OSError, socket.timeout) as exc:
raise RconError(f"rcon i/o error: {exc}") from exc
finally:
sock.close()
return "".join(chunks).rstrip()
def _send_packet(sock: socket.socket, req_id: int, ptype: int, body: str) -> None:
body_bytes = body.encode("utf-8") + b"\x00\x00"
size = 4 + 4 + len(body_bytes)
sock.sendall(struct.pack("<iii", size, req_id, ptype) + body_bytes)
def _recv_packet(sock: socket.socket) -> tuple[int, int, str]:
size = struct.unpack("<i", _recvall(sock, 4))[0]
payload = _recvall(sock, size)
req_id, ptype = struct.unpack("<ii", payload[:8])
body = payload[8:].rstrip(b"\x00").decode("utf-8", errors="replace")
return req_id, ptype, body
def _recvall(sock: socket.socket, n: int) -> bytes:
data = b""
while len(data) < n:
chunk = sock.recv(n - len(data))
if not chunk:
raise RconError("rcon connection closed")
data += chunk
return data
# --- Status parsing -------------------------------------------------------
_MAP_RE = re.compile(r"^map\s*:\s*(\S+)", re.MULTILINE)
_PLAYERS_RE = re.compile(
r"^players\s*:\s*(\d+)\s+humans,\s*(\d+)\s+bots\s*\((\d+)\s+max\)"
r"\s*\((not hibernating|hibernating)\)",
re.MULTILINE,
)
# A status player row: starts with `#`, then variable numeric prefixes,
# then a quoted name, then STEAM_X:Y:Z, then connected time, then ping.
_PLAYER_RE = re.compile(
r'^#\s+(?:\d+\s+)+"(?P<name>[^"]*)"\s+'
r"(?P<sid>STEAM_\d+:(?P<y>\d+):(?P<z>\d+))\s+"
r"(?P<connected>[\d:]+)\s+"
r"(?P<ping>\d+)\s+",
re.MULTILINE,
)
def parse_status(body: str) -> StatusResponse:
map_match = _MAP_RE.search(body)
if not map_match:
raise RconError(f"status: no map line in response\n{body!r}")
players_match = _PLAYERS_RE.search(body)
if not players_match:
raise RconError(f"status: no players line\n{body!r}")
roster: list[PlayerRow] = []
for m in _PLAYER_RE.finditer(body):
y = int(m.group("y"))
z = int(m.group("z"))
roster.append(
PlayerRow(
steam_id_64=str(_STEAM_ID_BASE + (z * 2) + y),
name=m.group("name"),
connected_seconds=_parse_duration(m.group("connected")),
ping=int(m.group("ping")),
)
)
return StatusResponse(
map=map_match.group(1),
players=int(players_match.group(1)),
bots=int(players_match.group(2)),
max_players=int(players_match.group(3)),
hibernating=(players_match.group(4) == "hibernating"),
roster=roster,
)
def _parse_duration(text: str) -> int:
"""Parse Source's connected duration: HH:MM:SS or MM:SS -> seconds."""
try:
parts = [int(p) for p in text.split(":")]
except ValueError as exc:
raise RconError(f"unparseable connected duration: {text!r}") from exc
if len(parts) == 2:
return parts[0] * 60 + parts[1]
if len(parts) == 3:
return parts[0] * 3600 + parts[1] * 60 + parts[2]
raise RconError(f"unparseable connected duration: {text!r}")