feat(l4d2): add callback-capable streaming process runner

This commit is contained in:
mwiegand 2026-04-23 00:54:55 +02:00
parent 7d3cf66ed4
commit 60de361706
No known key found for this signature in database
2 changed files with 101 additions and 0 deletions

View file

@ -0,0 +1,79 @@
from dataclasses import dataclass
import subprocess
import sys
import threading
from typing import Callable, Sequence
@dataclass(slots=True)
class CommandResult:
returncode: int
stdout: str
stderr: str
def run_command(
cmd: Sequence[str],
*,
on_stdout: Callable[[str], None] | None = None,
on_stderr: Callable[[str], None] | None = None,
passthrough: bool = False,
) -> CommandResult:
stdout_lines: list[str] = []
stderr_lines: list[str] = []
proc = subprocess.Popen(
list(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
def pump(
stream: subprocess.Popen[str].stdout,
sink: list[str],
callback: Callable[[str], None] | None,
output_stream,
) -> None:
if stream is None:
return
for raw in iter(stream.readline, ""):
line = raw.rstrip("\n")
sink.append(line)
if callback is not None:
callback(line)
if passthrough:
print(line, file=output_stream)
stream.close()
stdout_thread = threading.Thread(
target=pump,
args=(proc.stdout, stdout_lines, on_stdout, sys.stdout),
daemon=True,
)
stderr_thread = threading.Thread(
target=pump,
args=(proc.stderr, stderr_lines, on_stderr, sys.stderr),
daemon=True,
)
stdout_thread.start()
stderr_thread.start()
returncode = proc.wait()
stdout_thread.join()
stderr_thread.join()
result = CommandResult(
returncode=returncode,
stdout="\n".join(stdout_lines),
stderr="\n".join(stderr_lines),
)
if returncode != 0:
raise subprocess.CalledProcessError(
returncode=returncode,
cmd=list(cmd),
output=result.stdout,
stderr=result.stderr,
)
return result

View file

@ -0,0 +1,22 @@
import subprocess
import pytest
from l4d2host.process import run_command
def test_callbacks_receive_lines() -> None:
out: list[str] = []
err: list[str] = []
run_command(
["python3", "-c", "import sys; print('ok'); print('warn', file=sys.stderr)"],
on_stdout=out.append,
on_stderr=err.append,
)
assert out == ["ok"]
assert err == ["warn"]
def test_nonzero_exit_raises() -> None:
with pytest.raises(subprocess.CalledProcessError):
run_command(["python3", "-c", "import sys; sys.exit(7)"])