I just published this article with this example of a test app runner that waits on the app's output (regardless of whether that output is on stdout
or stderr
) and raises an exception on timeout.
An example test app:
# app/test_app.py
import os
import sys
import time
APP_READY_STRING = os.environ.get("APP_READY_STRING", "No 'App Ready' string provided")
POST_STARTUP_RUN_TIME = 5
STARTUP_DELAY = 5
def main():
# log to stdout and stderr in order to test that both can be captured
print("Hello World!")
print("And hello again to you, sir!", file=sys.stderr)
# simulate startup delay
for i in range(STARTUP_DELAY):
print(f"{time.strftime('%H:%M:%S')} Test app waiting... {i+1}")
time.sleep(1)
# print out the string that's being tested for. it should not matter whether
# this is printed to stdout or stderr
print(APP_READY_STRING, flush=True, file=sys.stderr)
# the app should run for 5 seconds before exiting, this will give enough time
# to test that killing the app works
time.sleep(POST_STARTUP_RUN_TIME)
print("Goodbye World!")
if __name__ == "__main__":
main()
The app_runner
module:
# app_runner/app_runner.py
import os
import subprocess
import sys
from .processes import kill_process, wait_for_process_output
APP_READY_STRING = "App started successfully..."
class AppRunner(object):
def __init__(self, app: str, cwd: str):
"""This class is used to run an app in a separate process.
Args:
app (str): The name of the app to run.
cwd (str): The path where the app is located."""
self.app = app
env = {
**os.environ,
"APP_READY_STRING": APP_READY_STRING,
}
cmd = [
sys.executable,
self.app,
]
# start the app in a separate process. it's important that the stdout and
# stderr streams are captured so that they can be checked for the expected
# output, and that the app isn't run with a shell
self.process = subprocess.Popen(
cmd,
cwd=cwd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
wait_for_process_output(self.process, APP_READY_STRING)
except Exception as e:
raise Exception(
f"AppRunner app '{self.app}' failed to start", e
)
def __enter__(self):
return self
def __exit__(self, *args):
kill_process(self.app)
The processes
module:
# app_runner/processes.py
import random
import string
import sys
import threading
import time
from typing import List, Union
import psutil
DEFAULT_APP_WAIT_TIME_SEC = 8
# This dictionary is used to store the state of the processes that are being
# searched.
processes = {}
def _is_process_match(command: str, process_names: List[str]) -> bool:
"""Identifying a process by its command line is not an exact science."""
if len(process_names) == 1:
command_parts = command.split(" ")
if command_parts[0] == process_names[0]:
return True
if len(process_names) > 1 and all(
[process_name in command for process_name in process_names]
):
return True
return False
def _find_in_stream(stream, text: str, process_handle: str) -> None:
while (
not processes[process_handle]["text_found"].is_set()
and not processes[process_handle]["timed_out"].is_set()
):
try:
line = stream.readline().decode("utf-8")
if text in line:
processes[process_handle]["text_found"].set()
_print_log(line)
except Exception:
pass
def _print_log(line: str) -> None:
line = f"{time.strftime('%H:%M:%S')} {line}\n"
sys.stderr.write(line)
sys.stderr.flush()
def _print_process_identifier(proc_name: str, cmd_line: str, process_names: List[str]):
return f"process '{proc_name}' (looking for {','.join(process_names)}) with command line '{cmd_line}'"
def _process_timeout(process_handle, timeout=DEFAULT_APP_WAIT_TIME_SEC) -> bool:
_print_log(
f"Waiting up to {timeout} seconds to abort search on process {process_handle}..."
)
timeout_remaining = timeout
while (
timeout_remaining > 0 and not processes[process_handle]["text_found"].is_set()
):
time.sleep(1)
timeout_remaining -= 1
if not processes[process_handle]["text_found"].is_set():
processes[process_handle]["timed_out"].set()
def _random_string(length: int) -> str:
"""Naive random string generator to create process identifiers."""
return "".join(random.choice(string.ascii_lowercase) for _ in range(length))
def wait_for_process_output(
process, text: str, timeout=DEFAULT_APP_WAIT_TIME_SEC
) -> None:
"""This function checks if the given text is in the process output within the given time limit."""
start_time = time.time()
process_handle = _random_string(10)
processes[process_handle] = {
"text_found": threading.Event(),
"timed_out": threading.Event(),
}
# start a new thread to stop searching after the timeout
threading.Thread(target=_process_timeout, args=(process_handle, timeout)).start()
# search for the text in the stdout and stderr streams
threading.Thread(
target=_find_in_stream, args=(process.stdout, text, process_handle)
).start()
threading.Thread(
target=_find_in_stream, args=(process.stderr, text, process_handle)
).start()
while True:
if processes[process_handle]["text_found"].is_set():
return
if processes[process_handle]["timed_out"].is_set():
raise Exception(
f"Failed to find '{text}' in process output after {time.time() - start_time} seconds."
)
def kill_process(process_names: Union[str, List[str]]) -> None:
"""Kill a Python process identified by the given name or list of names.
There are easier ways to do this, but this is the most reliable way to kill a
Python-run process without knowing the exact command line arguments and without
killing the current process / test runner process (eg. nox).
"""
if isinstance(process_names, str):
process_names = [process_names]
proc_name = "undefined"
cmd_line = "undefined"
# Kill all processes with the given name
for proc in psutil.process_iter(attrs=["pid", "name", "cmdline"], ad_value=None):
try:
proc_name = proc.name()
if proc.status() == psutil.STATUS_ZOMBIE:
continue
# Some apps run under their own names, some as `Python` (this also
# depends on the OS)
if _is_process_match(proc_name, process_names):
print(f"Killing process with name {proc_name}...")
proc.kill()
elif proc_name.lower().startswith("python"):
# drop the first argument, which is the python executable
python_command_parts = proc.cmdline()[1:]
# the initial command part is the last part of the path
python_command_parts[0] = python_command_parts[0].split("/")[-1]
# combine the remaining arguments
command = " ".join(python_command_parts)
print(
f"Evaluating process with name '{proc_name}' and command '{command}'..."
)
if (
len(cmd_line) > 1
and "nox" not in command # don't kill the test runner process
and _is_process_match(command, process_names)
):
print(
f"Killing process with name '{proc_name}' and command '{command}'..."
)
proc.kill()
except psutil.ZombieProcess as zp:
print(
f"Failed to kill zombie process {_print_process_identifier(proc_name, cmd_line, process_names)}: {str(zp)}"
)
except psutil.NoSuchProcess as nsp:
print(
f"Failed to kill process {_print_process_identifier(proc_name, cmd_line, process_names)}: {str(nsp)}"
)
Putting it all together:
# test_app_runner.py
from pathlib import Path
from app_runner.app_runner import AppRunner
def main():
print("Starting AppRunner test...")
app = "test_app.py"
app_location = Path(__file__).parent / "app"
with AppRunner(app=app, cwd=app_location) as app_runner:
print("Test app start detected.")
print("AppRunner test complete.")
if __name__ == "__main__":
main()
subprocess.Popen
to assign the child's stdout to stderr. Usually stderr is unbuffered. – Racereadline
will block, waiting to receive the next line, as will anything else that readssys.stdin
. You can tell Python to makesys.stdout
unbuffered by specifying the-u
option on the command line. – Oleander