diff --git a/l4d2web/routes/log_routes.py b/l4d2web/routes/log_routes.py index d4bd89b..5f80eb7 100644 --- a/l4d2web/routes/log_routes.py +++ b/l4d2web/routes/log_routes.py @@ -28,6 +28,9 @@ def stream_server_logs(server_id: int) -> Response: def generate(): for line in facade.stream_server_logs(server.name, lines=200, follow=True): - yield f"data: {line}\n\n" + if line == "": + yield ": keepalive\n\n" + else: + yield f"data: {line}\n\n" return Response(generate(), mimetype="text/event-stream") diff --git a/l4d2web/services/host_commands.py b/l4d2web/services/host_commands.py index a925b9e..ab4ffef 100644 --- a/l4d2web/services/host_commands.py +++ b/l4d2web/services/host_commands.py @@ -1,5 +1,6 @@ from dataclasses import dataclass import os +import select import signal import subprocess import sys @@ -143,19 +144,37 @@ def run_command( return result -def stream_command(cmd: Sequence[str]) -> Iterator[str]: +def stream_command(cmd: Sequence[str], *, heartbeat_interval: float = 15.0) -> Iterator[str]: + # An empty string yielded between real lines is a heartbeat tick: it lets + # SSE callers emit a keepalive frame so a closed peer is detected, instead + # of blocking forever inside readline() when the child is silent. proc = subprocess.Popen( list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, - text=True, - bufsize=1, + bufsize=0, ) try: if proc.stdout is None: return - for raw in iter(proc.stdout.readline, ""): - yield raw.rstrip("\n") + fd = proc.stdout.fileno() + buffer = b"" + while True: + ready, _, _ = select.select([fd], [], [], heartbeat_interval) + if not ready: + if proc.poll() is not None: + break + yield "" + continue + chunk = os.read(fd, 4096) + if not chunk: + break + buffer += chunk + while b"\n" in buffer: + line, buffer = buffer.split(b"\n", 1) + yield line.decode("utf-8", errors="replace") + if buffer: + yield buffer.decode("utf-8", errors="replace") finally: if proc.poll() is None: proc.terminate() diff --git a/l4d2web/tests/test_host_commands.py b/l4d2web/tests/test_host_commands.py index f9a6512..3447676 100644 --- a/l4d2web/tests/test_host_commands.py +++ b/l4d2web/tests/test_host_commands.py @@ -72,3 +72,46 @@ def test_stream_command_yields_stdout_lines() -> None: lines = list(stream_command(["python3", "-c", "print('one'); print('two')"])) assert lines == ["one", "two"] + + +def test_stream_command_emits_heartbeat_when_subprocess_silent() -> None: + import time + + from l4d2web.services.host_commands import stream_command + + cmd = [ + "python3", + "-c", + "import time; time.sleep(0.4); print('done')", + ] + + started = time.monotonic() + items: list[str] = [] + for item in stream_command(cmd, heartbeat_interval=0.05): + items.append(item) + if time.monotonic() - started > 2.0: + break + + assert "done" in items, items + heartbeats = [i for i in items if i == ""] + assert len(heartbeats) >= 2, f"expected ≥2 heartbeat ticks during the silent 0.4s window, got items={items!r}" + + +def test_stream_command_close_releases_subprocess_promptly() -> None: + import time + + from l4d2web.services.host_commands import stream_command + + cmd = [ + "python3", + "-c", + "import time;\nwhile True:\n time.sleep(60)", + ] + + gen = stream_command(cmd, heartbeat_interval=0.05) + assert next(gen) == "" + + started = time.monotonic() + gen.close() + elapsed = time.monotonic() - started + assert elapsed < 1.0, f"gen.close() took {elapsed:.2f}s; subprocess cleanup must not block" diff --git a/l4d2web/tests/test_status_and_server_logs.py b/l4d2web/tests/test_status_and_server_logs.py index c13942c..978211e 100644 --- a/l4d2web/tests/test_status_and_server_logs.py +++ b/l4d2web/tests/test_status_and_server_logs.py @@ -48,6 +48,23 @@ def test_owner_can_stream_server_logs(owner_client_with_server, monkeypatch) -> assert response.status_code == 200 +def test_log_stream_translates_heartbeat_to_sse_keepalive(owner_client_with_server, monkeypatch) -> None: + client, server_id = owner_client_with_server + + monkeypatch.setattr( + "l4d2web.services.l4d2_facade.stream_server_logs", + lambda name, lines=200, follow=True: iter(["first", "", "second"]), + ) + + response = client.get(f"/servers/{server_id}/logs/stream") + assert response.status_code == 200 + body = response.get_data(as_text=True) + assert "data: first\n\n" in body + assert "data: second\n\n" in body + assert ": keepalive\n\n" in body + assert "data: \n\n" not in body + + def test_status_precedence() -> None: from l4d2web.services.status import compute_display_state