176 lines
5.3 KiB
Python
176 lines
5.3 KiB
Python
"""Source RCON client + status parser.
|
|
|
|
Pure stdlib. One TCP connection per query — fine at our scale (loopback
|
|
~10-20ms round-trip; pooling not worth the complexity).
|
|
|
|
Source RCON wire format:
|
|
size : little-endian int32 (count of the bytes that follow)
|
|
req_id: little-endian int32
|
|
ptype : little-endian int32
|
|
body : utf-8 string, null-terminated
|
|
pad : one extra null byte
|
|
|
|
Packet types:
|
|
SERVERDATA_AUTH = 3 (client -> server)
|
|
SERVERDATA_EXECCOMMAND = 2 (client -> server)
|
|
SERVERDATA_AUTH_RESPONSE= 2 (server -> client)
|
|
SERVERDATA_RESPONSE_VALUE = 0 (server -> client)
|
|
|
|
After auth, the server sends a type=0 empty packet *first* and then the
|
|
type=2 auth response. req_id == -1 on the auth response = bad password.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
import socket
|
|
import struct
|
|
from dataclasses import dataclass
|
|
|
|
|
|
SERVERDATA_AUTH = 3
|
|
SERVERDATA_EXECCOMMAND = 2
|
|
SERVERDATA_AUTH_RESPONSE = 2
|
|
SERVERDATA_RESPONSE_VALUE = 0
|
|
|
|
_STEAM_ID_BASE = 76561197960265728
|
|
|
|
|
|
class RconError(Exception):
|
|
"""Network, timeout, or protocol error."""
|
|
|
|
|
|
class RconAuthError(RconError):
|
|
"""The server rejected the password."""
|
|
|
|
|
|
@dataclass(slots=True, frozen=True)
|
|
class PlayerRow:
|
|
steam_id_64: str
|
|
name: str
|
|
connected_seconds: int
|
|
ping: int
|
|
|
|
|
|
@dataclass(slots=True, frozen=True)
|
|
class StatusResponse:
|
|
map: str
|
|
players: int
|
|
max_players: int
|
|
bots: int
|
|
hibernating: bool
|
|
roster: list[PlayerRow]
|
|
|
|
|
|
def query_status(
|
|
host: str, port: int, password: str, *, timeout: float = 2.0
|
|
) -> StatusResponse:
|
|
"""Connect to the RCON port, authenticate, send `status`, return parsed result."""
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(timeout)
|
|
try:
|
|
try:
|
|
sock.connect((host, port))
|
|
except OSError as exc:
|
|
raise RconError(f"connect failed: {exc}") from exc
|
|
|
|
try:
|
|
_send_packet(sock, 1, SERVERDATA_AUTH, password)
|
|
# Drain the leading empty type-0 packet; then read the real auth response.
|
|
r1 = _recv_packet(sock)
|
|
r2 = _recv_packet(sock)
|
|
auth = r2 if r1[1] == SERVERDATA_RESPONSE_VALUE else r1
|
|
if auth[0] == -1:
|
|
raise RconAuthError("bad rcon password")
|
|
|
|
_send_packet(sock, 2, SERVERDATA_EXECCOMMAND, "status")
|
|
_, _, body = _recv_packet(sock)
|
|
except (OSError, socket.timeout) as exc:
|
|
raise RconError(f"rcon i/o error: {exc}") from exc
|
|
finally:
|
|
sock.close()
|
|
|
|
return parse_status(body)
|
|
|
|
|
|
def _send_packet(sock: socket.socket, req_id: int, ptype: int, body: str) -> None:
|
|
body_bytes = body.encode("utf-8") + b"\x00\x00"
|
|
size = 4 + 4 + len(body_bytes)
|
|
sock.sendall(struct.pack("<iii", size, req_id, ptype) + body_bytes)
|
|
|
|
|
|
def _recv_packet(sock: socket.socket) -> tuple[int, int, str]:
|
|
size = struct.unpack("<i", _recvall(sock, 4))[0]
|
|
payload = _recvall(sock, size)
|
|
req_id, ptype = struct.unpack("<ii", payload[:8])
|
|
body = payload[8:].rstrip(b"\x00").decode("utf-8", errors="replace")
|
|
return req_id, ptype, body
|
|
|
|
|
|
def _recvall(sock: socket.socket, n: int) -> bytes:
|
|
data = b""
|
|
while len(data) < n:
|
|
chunk = sock.recv(n - len(data))
|
|
if not chunk:
|
|
raise RconError("rcon connection closed")
|
|
data += chunk
|
|
return data
|
|
|
|
|
|
# --- Status parsing -------------------------------------------------------
|
|
|
|
_MAP_RE = re.compile(r"^map\s*:\s*(\S+)", re.MULTILINE)
|
|
_PLAYERS_RE = re.compile(
|
|
r"^players\s*:\s*(\d+)\s+humans,\s*(\d+)\s+bots\s*\((\d+)\s+max\)"
|
|
r"\s*\((not hibernating|hibernating)\)",
|
|
re.MULTILINE,
|
|
)
|
|
# A status player row: starts with `#`, then variable numeric prefixes,
|
|
# then a quoted name, then STEAM_X:Y:Z, then connected time, then ping.
|
|
_PLAYER_RE = re.compile(
|
|
r'^#\s+(?:\d+\s+)+"(?P<name>[^"]*)"\s+'
|
|
r"(?P<sid>STEAM_\d+:(?P<y>\d+):(?P<z>\d+))\s+"
|
|
r"(?P<connected>[\d:]+)\s+"
|
|
r"(?P<ping>\d+)\s+",
|
|
re.MULTILINE,
|
|
)
|
|
|
|
|
|
def parse_status(body: str) -> StatusResponse:
|
|
map_match = _MAP_RE.search(body)
|
|
if not map_match:
|
|
raise RconError(f"status: no map line in response\n{body!r}")
|
|
players_match = _PLAYERS_RE.search(body)
|
|
if not players_match:
|
|
raise RconError(f"status: no players line\n{body!r}")
|
|
|
|
roster: list[PlayerRow] = []
|
|
for m in _PLAYER_RE.finditer(body):
|
|
y = int(m.group("y"))
|
|
z = int(m.group("z"))
|
|
roster.append(
|
|
PlayerRow(
|
|
steam_id_64=str(_STEAM_ID_BASE + (z * 2) + y),
|
|
name=m.group("name"),
|
|
connected_seconds=_parse_duration(m.group("connected")),
|
|
ping=int(m.group("ping")),
|
|
)
|
|
)
|
|
|
|
return StatusResponse(
|
|
map=map_match.group(1),
|
|
players=int(players_match.group(1)),
|
|
bots=int(players_match.group(2)),
|
|
max_players=int(players_match.group(3)),
|
|
hibernating=(players_match.group(4) == "hibernating"),
|
|
roster=roster,
|
|
)
|
|
|
|
|
|
def _parse_duration(text: str) -> int:
|
|
"""Parse Source's connected duration: HH:MM:SS or MM:SS -> seconds."""
|
|
parts = [int(p) for p in text.split(":")]
|
|
if len(parts) == 2:
|
|
return parts[0] * 60 + parts[1]
|
|
if len(parts) == 3:
|
|
return parts[0] * 3600 + parts[1] * 60 + parts[2]
|
|
raise RconError(f"unparseable connected duration: {text!r}")
|