From 4552af654443d5dcc62cfaafd8a22cd40e8fff75 Mon Sep 17 00:00:00 2001 From: mwiegand Date: Fri, 8 May 2026 11:18:56 +0200 Subject: [PATCH] fix(l4d2-web): keep SSE log stream from pinning gunicorn threads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit stream_command used a blocking proc.stdout.readline() that never woke when the underlying journalctl was silent, so Flask never delivered GeneratorExit on client disconnect — the worker thread and the journalctl child both leaked permanently and pinned the gunicorn thread pool. Switch to a select-based read loop with a 15s heartbeat tick (yielded as ""), and translate the tick to an SSE keepalive comment in the log route. Co-Authored-By: Claude Opus 4.7 (1M context) --- l4d2web/routes/log_routes.py | 5 ++- l4d2web/services/host_commands.py | 29 ++++++++++--- l4d2web/tests/test_host_commands.py | 43 ++++++++++++++++++++ l4d2web/tests/test_status_and_server_logs.py | 17 ++++++++ 4 files changed, 88 insertions(+), 6 deletions(-) diff --git a/l4d2web/routes/log_routes.py b/l4d2web/routes/log_routes.py index d4bd89b..5f80eb7 100644 --- a/l4d2web/routes/log_routes.py +++ b/l4d2web/routes/log_routes.py @@ -28,6 +28,9 @@ def stream_server_logs(server_id: int) -> Response: def generate(): for line in facade.stream_server_logs(server.name, lines=200, follow=True): - yield f"data: {line}\n\n" + if line == "": + yield ": keepalive\n\n" + else: + yield f"data: {line}\n\n" return Response(generate(), mimetype="text/event-stream") diff --git a/l4d2web/services/host_commands.py b/l4d2web/services/host_commands.py index a925b9e..ab4ffef 100644 --- a/l4d2web/services/host_commands.py +++ b/l4d2web/services/host_commands.py @@ -1,5 +1,6 @@ from dataclasses import dataclass import os +import select import signal import subprocess import sys @@ -143,19 +144,37 @@ def run_command( return result -def stream_command(cmd: Sequence[str]) -> Iterator[str]: +def stream_command(cmd: Sequence[str], *, heartbeat_interval: float = 15.0) -> Iterator[str]: + # An empty string yielded between real lines is a heartbeat tick: it lets + # SSE callers emit a keepalive frame so a closed peer is detected, instead + # of blocking forever inside readline() when the child is silent. proc = subprocess.Popen( list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, - text=True, - bufsize=1, + bufsize=0, ) try: if proc.stdout is None: return - for raw in iter(proc.stdout.readline, ""): - yield raw.rstrip("\n") + fd = proc.stdout.fileno() + buffer = b"" + while True: + ready, _, _ = select.select([fd], [], [], heartbeat_interval) + if not ready: + if proc.poll() is not None: + break + yield "" + continue + chunk = os.read(fd, 4096) + if not chunk: + break + buffer += chunk + while b"\n" in buffer: + line, buffer = buffer.split(b"\n", 1) + yield line.decode("utf-8", errors="replace") + if buffer: + yield buffer.decode("utf-8", errors="replace") finally: if proc.poll() is None: proc.terminate() diff --git a/l4d2web/tests/test_host_commands.py b/l4d2web/tests/test_host_commands.py index f9a6512..3447676 100644 --- a/l4d2web/tests/test_host_commands.py +++ b/l4d2web/tests/test_host_commands.py @@ -72,3 +72,46 @@ def test_stream_command_yields_stdout_lines() -> None: lines = list(stream_command(["python3", "-c", "print('one'); print('two')"])) assert lines == ["one", "two"] + + +def test_stream_command_emits_heartbeat_when_subprocess_silent() -> None: + import time + + from l4d2web.services.host_commands import stream_command + + cmd = [ + "python3", + "-c", + "import time; time.sleep(0.4); print('done')", + ] + + started = time.monotonic() + items: list[str] = [] + for item in stream_command(cmd, heartbeat_interval=0.05): + items.append(item) + if time.monotonic() - started > 2.0: + break + + assert "done" in items, items + heartbeats = [i for i in items if i == ""] + assert len(heartbeats) >= 2, f"expected ≥2 heartbeat ticks during the silent 0.4s window, got items={items!r}" + + +def test_stream_command_close_releases_subprocess_promptly() -> None: + import time + + from l4d2web.services.host_commands import stream_command + + cmd = [ + "python3", + "-c", + "import time;\nwhile True:\n time.sleep(60)", + ] + + gen = stream_command(cmd, heartbeat_interval=0.05) + assert next(gen) == "" + + started = time.monotonic() + gen.close() + elapsed = time.monotonic() - started + assert elapsed < 1.0, f"gen.close() took {elapsed:.2f}s; subprocess cleanup must not block" diff --git a/l4d2web/tests/test_status_and_server_logs.py b/l4d2web/tests/test_status_and_server_logs.py index c13942c..978211e 100644 --- a/l4d2web/tests/test_status_and_server_logs.py +++ b/l4d2web/tests/test_status_and_server_logs.py @@ -48,6 +48,23 @@ def test_owner_can_stream_server_logs(owner_client_with_server, monkeypatch) -> assert response.status_code == 200 +def test_log_stream_translates_heartbeat_to_sse_keepalive(owner_client_with_server, monkeypatch) -> None: + client, server_id = owner_client_with_server + + monkeypatch.setattr( + "l4d2web.services.l4d2_facade.stream_server_logs", + lambda name, lines=200, follow=True: iter(["first", "", "second"]), + ) + + response = client.get(f"/servers/{server_id}/logs/stream") + assert response.status_code == 200 + body = response.get_data(as_text=True) + assert "data: first\n\n" in body + assert "data: second\n\n" in body + assert ": keepalive\n\n" in body + assert "data: \n\n" not in body + + def test_status_precedence() -> None: from l4d2web.services.status import compute_display_state