from dataclasses import dataclass import subprocess import sys import threading from typing import Callable, Sequence @dataclass(slots=True) class CommandResult: returncode: int stdout: str stderr: str def run_command( cmd: Sequence[str], *, on_stdout: Callable[[str], None] | None = None, on_stderr: Callable[[str], None] | None = None, passthrough: bool = False, ) -> CommandResult: stdout_lines: list[str] = [] stderr_lines: list[str] = [] proc = subprocess.Popen( list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, ) def pump( stream, sink: list[str], callback: Callable[[str], None] | None, output_stream, ) -> None: if stream is None: return for raw in iter(stream.readline, ""): line = raw.rstrip("\n") sink.append(line) if callback is not None: callback(line) if passthrough: print(line, file=output_stream) stream.close() stdout_thread = threading.Thread( target=pump, args=(proc.stdout, stdout_lines, on_stdout, sys.stdout), daemon=True, ) stderr_thread = threading.Thread( target=pump, args=(proc.stderr, stderr_lines, on_stderr, sys.stderr), daemon=True, ) stdout_thread.start() stderr_thread.start() returncode = proc.wait() stdout_thread.join() stderr_thread.join() result = CommandResult( returncode=returncode, stdout="\n".join(stdout_lines), stderr="\n".join(stderr_lines), ) if returncode != 0: raise subprocess.CalledProcessError( returncode=returncode, cmd=list(cmd), output=result.stdout, stderr=result.stderr, ) return result