fix(l4d2-web): keep SSE log stream from pinning gunicorn threads
stream_command used a blocking proc.stdout.readline() that never woke when the underlying journalctl was silent, so Flask never delivered GeneratorExit on client disconnect — the worker thread and the journalctl child both leaked permanently and pinned the gunicorn thread pool. Switch to a select-based read loop with a 15s heartbeat tick (yielded as ""), and translate the tick to an SSE keepalive comment in the log route. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
ffc4cdbd7d
commit
4552af6544
4 changed files with 88 additions and 6 deletions
|
|
@ -28,6 +28,9 @@ def stream_server_logs(server_id: int) -> Response:
|
|||
|
||||
def generate():
|
||||
for line in facade.stream_server_logs(server.name, lines=200, follow=True):
|
||||
if line == "":
|
||||
yield ": keepalive\n\n"
|
||||
else:
|
||||
yield f"data: {line}\n\n"
|
||||
|
||||
return Response(generate(), mimetype="text/event-stream")
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
from dataclasses import dataclass
|
||||
import os
|
||||
import select
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
|
|
@ -143,19 +144,37 @@ def run_command(
|
|||
return result
|
||||
|
||||
|
||||
def stream_command(cmd: Sequence[str]) -> Iterator[str]:
|
||||
def stream_command(cmd: Sequence[str], *, heartbeat_interval: float = 15.0) -> Iterator[str]:
|
||||
# An empty string yielded between real lines is a heartbeat tick: it lets
|
||||
# SSE callers emit a keepalive frame so a closed peer is detected, instead
|
||||
# of blocking forever inside readline() when the child is silent.
|
||||
proc = subprocess.Popen(
|
||||
list(cmd),
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
bufsize=1,
|
||||
bufsize=0,
|
||||
)
|
||||
try:
|
||||
if proc.stdout is None:
|
||||
return
|
||||
for raw in iter(proc.stdout.readline, ""):
|
||||
yield raw.rstrip("\n")
|
||||
fd = proc.stdout.fileno()
|
||||
buffer = b""
|
||||
while True:
|
||||
ready, _, _ = select.select([fd], [], [], heartbeat_interval)
|
||||
if not ready:
|
||||
if proc.poll() is not None:
|
||||
break
|
||||
yield ""
|
||||
continue
|
||||
chunk = os.read(fd, 4096)
|
||||
if not chunk:
|
||||
break
|
||||
buffer += chunk
|
||||
while b"\n" in buffer:
|
||||
line, buffer = buffer.split(b"\n", 1)
|
||||
yield line.decode("utf-8", errors="replace")
|
||||
if buffer:
|
||||
yield buffer.decode("utf-8", errors="replace")
|
||||
finally:
|
||||
if proc.poll() is None:
|
||||
proc.terminate()
|
||||
|
|
|
|||
|
|
@ -72,3 +72,46 @@ def test_stream_command_yields_stdout_lines() -> None:
|
|||
lines = list(stream_command(["python3", "-c", "print('one'); print('two')"]))
|
||||
|
||||
assert lines == ["one", "two"]
|
||||
|
||||
|
||||
def test_stream_command_emits_heartbeat_when_subprocess_silent() -> None:
|
||||
import time
|
||||
|
||||
from l4d2web.services.host_commands import stream_command
|
||||
|
||||
cmd = [
|
||||
"python3",
|
||||
"-c",
|
||||
"import time; time.sleep(0.4); print('done')",
|
||||
]
|
||||
|
||||
started = time.monotonic()
|
||||
items: list[str] = []
|
||||
for item in stream_command(cmd, heartbeat_interval=0.05):
|
||||
items.append(item)
|
||||
if time.monotonic() - started > 2.0:
|
||||
break
|
||||
|
||||
assert "done" in items, items
|
||||
heartbeats = [i for i in items if i == ""]
|
||||
assert len(heartbeats) >= 2, f"expected ≥2 heartbeat ticks during the silent 0.4s window, got items={items!r}"
|
||||
|
||||
|
||||
def test_stream_command_close_releases_subprocess_promptly() -> None:
|
||||
import time
|
||||
|
||||
from l4d2web.services.host_commands import stream_command
|
||||
|
||||
cmd = [
|
||||
"python3",
|
||||
"-c",
|
||||
"import time;\nwhile True:\n time.sleep(60)",
|
||||
]
|
||||
|
||||
gen = stream_command(cmd, heartbeat_interval=0.05)
|
||||
assert next(gen) == ""
|
||||
|
||||
started = time.monotonic()
|
||||
gen.close()
|
||||
elapsed = time.monotonic() - started
|
||||
assert elapsed < 1.0, f"gen.close() took {elapsed:.2f}s; subprocess cleanup must not block"
|
||||
|
|
|
|||
|
|
@ -48,6 +48,23 @@ def test_owner_can_stream_server_logs(owner_client_with_server, monkeypatch) ->
|
|||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_log_stream_translates_heartbeat_to_sse_keepalive(owner_client_with_server, monkeypatch) -> None:
|
||||
client, server_id = owner_client_with_server
|
||||
|
||||
monkeypatch.setattr(
|
||||
"l4d2web.services.l4d2_facade.stream_server_logs",
|
||||
lambda name, lines=200, follow=True: iter(["first", "", "second"]),
|
||||
)
|
||||
|
||||
response = client.get(f"/servers/{server_id}/logs/stream")
|
||||
assert response.status_code == 200
|
||||
body = response.get_data(as_text=True)
|
||||
assert "data: first\n\n" in body
|
||||
assert "data: second\n\n" in body
|
||||
assert ": keepalive\n\n" in body
|
||||
assert "data: \n\n" not in body
|
||||
|
||||
|
||||
def test_status_precedence() -> None:
|
||||
from l4d2web.services.status import compute_display_state
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue