diff --git a/l4d2web/services/rcon.py b/l4d2web/services/rcon.py index ea5e74f..49bb699 100644 --- a/l4d2web/services/rcon.py +++ b/l4d2web/services/rcon.py @@ -32,6 +32,10 @@ SERVERDATA_EXECCOMMAND = 2 SERVERDATA_AUTH_RESPONSE = 2 SERVERDATA_RESPONSE_VALUE = 0 +# req_id values used by execute_command +_EXEC_REQ_ID = 2 +_MARKER_REQ_ID = 0xC0DE + _STEAM_ID_BASE = 76561197960265728 @@ -61,6 +65,29 @@ class StatusResponse: roster: list[PlayerRow] +def _connect_and_auth( + sock: socket.socket, host: str, port: int, password: str +) -> None: + """Open TCP connection and authenticate. Raises: + + * RconError – on connect failure or i/o error during handshake + * RconAuthError – when the server returns req_id == -1 (bad password) + """ + try: + sock.connect((host, port)) + except OSError as exc: + raise RconError(f"connect failed: {exc}") from exc + + _send_packet(sock, 1, SERVERDATA_AUTH, password) + # The server always sends a leading empty type-0 packet before the real + # AUTH_RESPONSE (type-2). Drain whichever arrives first. + r1 = _recv_packet(sock) + r2 = _recv_packet(sock) + auth = r2 if r1[1] == SERVERDATA_RESPONSE_VALUE else r1 + if auth[0] == -1: + raise RconAuthError("bad rcon password") + + def query_status( host: str, port: int, password: str, *, timeout: float = 2.0 ) -> StatusResponse: @@ -69,19 +96,7 @@ def query_status( sock.settimeout(timeout) try: try: - sock.connect((host, port)) - except OSError as exc: - raise RconError(f"connect failed: {exc}") from exc - - try: - _send_packet(sock, 1, SERVERDATA_AUTH, password) - # Drain the leading empty type-0 packet; then read the real auth response. - r1 = _recv_packet(sock) - r2 = _recv_packet(sock) - auth = r2 if r1[1] == SERVERDATA_RESPONSE_VALUE else r1 - if auth[0] == -1: - raise RconAuthError("bad rcon password") - + _connect_and_auth(sock, host, port, password) _send_packet(sock, 2, SERVERDATA_EXECCOMMAND, "status") _, _, body = _recv_packet(sock) except (OSError, socket.timeout) as exc: @@ -92,6 +107,50 @@ def query_status( return parse_status(body) +def execute_command( + host: str, port: int, password: str, command: str, *, timeout: float = 30.0 +) -> str: + """Authenticate, send a single command, return the joined reply body. + + Uses the trailing-marker pattern: after the exec packet, send an empty + SERVERDATA_RESPONSE_VALUE packet with a sentinel req_id. Read response + packets, accumulating bodies, until we see one whose req_id matches the + sentinel — that guarantees the real reply is complete, because RCON + processes requests in receive order. Multi-packet replies (>4096 B, like + `cvarlist`) are reassembled this way. + """ + if not command or not command.strip(): + raise ValueError("command must not be empty or whitespace-only") + if "\x00" in command: + raise ValueError("command must not contain null bytes") + if len(command.encode("utf-8")) > 1000: + raise ValueError("command exceeds maximum byte length of 1000") + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + try: + try: + _connect_and_auth(sock, host, port, password) + + _send_packet(sock, _EXEC_REQ_ID, SERVERDATA_EXECCOMMAND, command) + # Trailing marker: srcds processes requests in order, so its echo + # of this empty packet arrives after all real command-output packets. + _send_packet(sock, _MARKER_REQ_ID, SERVERDATA_RESPONSE_VALUE, "") + + chunks: list[str] = [] + while True: + req_id, _, body = _recv_packet(sock) + if req_id == _MARKER_REQ_ID: + break + chunks.append(body) + except (OSError, socket.timeout) as exc: + raise RconError(f"rcon i/o error: {exc}") from exc + finally: + sock.close() + + return "".join(chunks).rstrip() + + def _send_packet(sock: socket.socket, req_id: int, ptype: int, body: str) -> None: body_bytes = body.encode("utf-8") + b"\x00\x00" size = 4 + 4 + len(body_bytes) diff --git a/l4d2web/tests/test_rcon.py b/l4d2web/tests/test_rcon.py index d6c0d41..333c480 100644 --- a/l4d2web/tests/test_rcon.py +++ b/l4d2web/tests/test_rcon.py @@ -19,9 +19,14 @@ import pytest from l4d2web.services.rcon import ( RconAuthError, RconError, + execute_command, query_status, ) +# req_id constants (must match rcon.py) +_EXEC_REQ_ID = 2 +_MARKER_REQ_ID = 0xC0DE + def _pack(req_id: int, ptype: int, body: str) -> bytes: body_bytes = body.encode("utf-8") + b"\x00\x00" @@ -77,6 +82,14 @@ def fake_rcon_server(handler) -> Iterator[int]: raise AssertionError("fake_rcon_server handler raised") from handler_error[0] +def _do_auth_handshake(conn: socket.socket) -> None: + """Receive AUTH packet and respond with the standard two-packet handshake.""" + req_id, ptype, body = _unpack_one(conn) + assert ptype == 3 # SERVERDATA_AUTH + conn.sendall(_pack(req_id, 0, "")) # leading empty type-0 + conn.sendall(_pack(req_id, 2, "")) # auth response — req_id != -1 means OK + + def test_auth_success_then_status(monkeypatch: pytest.MonkeyPatch) -> None: response_body = ( "hostname: Left 4 Dead 2\n" @@ -153,3 +166,138 @@ def test_parse_duration_rejects_malformed_as_rcon_error() -> None: for bad in ["", ":", "abc", "1:", ":5", "1:2:3:4"]: with pytest.raises(RconError): _parse_duration(bad) + + +# --------------------------------------------------------------------------- +# execute_command tests +# --------------------------------------------------------------------------- + + +def test_execute_command_single_packet_reply() -> None: + """Handler sends one body packet then the marker echo; client returns body.""" + + def handler(conn: socket.socket) -> None: + _do_auth_handshake(conn) + + # Receive EXECCOMMAND + exec_id, exec_type, cmd = _unpack_one(conn) + assert exec_type == 2 # SERVERDATA_EXECCOMMAND + assert cmd == "echo hello" + + # Receive the marker packet + marker_id, marker_type, marker_body = _unpack_one(conn) + assert marker_type == 0 # SERVERDATA_RESPONSE_VALUE + assert marker_body == "" + + # Send reply body, then marker echo + conn.sendall(_pack(exec_id, 0, "hello")) + conn.sendall(_pack(marker_id, 0, "")) + + with fake_rcon_server(handler) as port: + result = execute_command("127.0.0.1", port, "letmein", "echo hello", timeout=2.0) + + assert result == "hello" + + +def test_execute_command_multi_packet_reply() -> None: + """Multi-packet replies are concatenated in order.""" + chunk1 = "A" * 3000 + chunk2 = "B" * 3000 + + def handler(conn: socket.socket) -> None: + _do_auth_handshake(conn) + + exec_id, _, _ = _unpack_one(conn) + marker_id, _, _ = _unpack_one(conn) + + conn.sendall(_pack(exec_id, 0, chunk1)) + conn.sendall(_pack(exec_id, 0, chunk2)) + conn.sendall(_pack(marker_id, 0, "")) + + with fake_rcon_server(handler) as port: + result = execute_command("127.0.0.1", port, "letmein", "cvarlist", timeout=2.0) + + assert result == chunk1 + chunk2 + + +def test_execute_command_empty_reply() -> None: + """Server echoes only the marker (e.g. for `say`); result is empty string.""" + + def handler(conn: socket.socket) -> None: + _do_auth_handshake(conn) + + _unpack_one(conn) # exec packet + marker_id, _, _ = _unpack_one(conn) + + conn.sendall(_pack(marker_id, 0, "")) + + with fake_rcon_server(handler) as port: + result = execute_command("127.0.0.1", port, "letmein", "say hi", timeout=2.0) + + assert result == "" + + +def test_execute_command_bad_password() -> None: + """Bad password on auth raises RconAuthError.""" + + def handler(conn: socket.socket) -> None: + req_id, _, _ = _unpack_one(conn) + conn.sendall(_pack(req_id, 0, "")) + conn.sendall(_pack(-1, 2, "")) # bad password sentinel + + with fake_rcon_server(handler) as port: + with pytest.raises(RconAuthError): + execute_command("127.0.0.1", port, "wrong", "status", timeout=2.0) + + +def test_execute_command_timeout() -> None: + """Server hangs; client raises RconError.""" + + def handler(conn: socket.socket) -> None: + import time + time.sleep(3.0) + + with fake_rcon_server(handler) as port: + with pytest.raises(RconError): + execute_command("127.0.0.1", port, "x", "status", timeout=0.3) + + +def test_execute_command_rejects_empty_command() -> None: + with pytest.raises(ValueError): + execute_command("127.0.0.1", 27015, "pw", "") + + +def test_execute_command_rejects_whitespace_only_command() -> None: + with pytest.raises(ValueError): + execute_command("127.0.0.1", 27015, "pw", " ") + + +def test_execute_command_rejects_null_byte_in_command() -> None: + with pytest.raises(ValueError): + execute_command("127.0.0.1", 27015, "pw", "echo\x00hello") + + +def test_execute_command_rejects_oversized_command() -> None: + with pytest.raises(ValueError): + execute_command("127.0.0.1", 27015, "pw", "x" * 1001) + + +def test_execute_command_drains_marker_after_response() -> None: + """Client stops reading exactly after the marker — does not block on a 5th packet.""" + + def handler(conn: socket.socket) -> None: + _do_auth_handshake(conn) + + exec_id, _, _ = _unpack_one(conn) + marker_id, _, _ = _unpack_one(conn) + + # Send one body packet then the marker — nothing else. + conn.sendall(_pack(exec_id, 0, "done")) + conn.sendall(_pack(marker_id, 0, "")) + # Handler returns immediately; if client reads past the marker it would + # block (no more data) and hit a timeout — the 2 s timeout guards us. + + with fake_rcon_server(handler) as port: + result = execute_command("127.0.0.1", port, "letmein", "status", timeout=2.0) + + assert result == "done"