left4me/l4d2host/process.py

138 lines
3.7 KiB
Python

from dataclasses import dataclass
import os
import signal
import subprocess
import sys
import threading
import time
from typing import Callable, Sequence
@dataclass(slots=True)
class CommandResult:
returncode: int
stdout: str
stderr: str
class CommandCancelledError(subprocess.CalledProcessError):
pass
def run_command(
cmd: Sequence[str],
*,
on_stdout: Callable[[str], None] | None = None,
on_stderr: Callable[[str], None] | None = None,
passthrough: bool = False,
should_cancel: Callable[[], bool] | None = None,
cancel_poll_seconds: float = 0.2,
cancel_terminate_timeout: float = 2.0,
) -> CommandResult:
stdout_lines: list[str] = []
stderr_lines: list[str] = []
proc = subprocess.Popen(
list(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
start_new_session=should_cancel is not None,
)
def emit_stderr_message(line: str) -> None:
stderr_lines.append(line)
if on_stderr is not None:
on_stderr(line)
if passthrough:
print(line, file=sys.stderr)
def terminate_process() -> None:
emit_stderr_message("cancellation requested; terminating subprocess")
if should_cancel is not None:
try:
os.killpg(proc.pid, signal.SIGTERM)
except ProcessLookupError:
pass
else:
proc.terminate()
def kill_process() -> None:
emit_stderr_message("subprocess did not exit after cancellation; killing subprocess")
if should_cancel is not None:
try:
os.killpg(proc.pid, signal.SIGKILL)
except ProcessLookupError:
pass
else:
proc.kill()
def pump(
stream,
sink: list[str],
callback: Callable[[str], None] | None,
output_stream,
) -> None:
if stream is None:
return
for raw in iter(stream.readline, ""):
line = raw.rstrip("\n")
sink.append(line)
if callback is not None:
callback(line)
if passthrough:
print(line, file=output_stream)
stream.close()
stdout_thread = threading.Thread(
target=pump,
args=(proc.stdout, stdout_lines, on_stdout, sys.stdout),
daemon=True,
)
stderr_thread = threading.Thread(
target=pump,
args=(proc.stderr, stderr_lines, on_stderr, sys.stderr),
daemon=True,
)
stdout_thread.start()
stderr_thread.start()
cancelled = False
while True:
returncode = proc.poll()
if returncode is not None:
break
if should_cancel is not None and should_cancel():
cancelled = True
terminate_process()
try:
returncode = proc.wait(timeout=cancel_terminate_timeout)
except subprocess.TimeoutExpired:
kill_process()
returncode = proc.wait()
break
time.sleep(cancel_poll_seconds)
stdout_thread.join()
stderr_thread.join()
result = CommandResult(
returncode=returncode,
stdout="\n".join(stdout_lines),
stderr="\n".join(stderr_lines),
)
if cancelled:
raise CommandCancelledError(
returncode=returncode,
cmd=list(cmd),
output=result.stdout,
stderr=result.stderr,
)
if returncode != 0:
raise subprocess.CalledProcessError(
returncode=returncode,
cmd=list(cmd),
output=result.stdout,
stderr=result.stderr,
)
return result