How to run a process with timeout and still get stdout at runtime
Asked Answered
K

4

7

The need:

  1. Timeout after X seconds, and kill the process (and all the processes it opened) if timeout reached before the process ends gracefully.
  2. Read ongoing output at runtime.
  3. Work with processes that produce output, ones that don't, and ones that produce output, and then stop producing it (e.g. get stuck).
  4. Run on Windows.
  5. Run on Python 3.5.2.

Python 3 subprocess module has timeout built in, and I've also tried and implemented timeout myself using timer and using threads, but it doesn't work with the output. readline() is blocking or not? readlines() is definitely waiting for the process to end before spitting out all the output, which is not what I need (I need ongoing).

I am close to switching to node.js :-(

Keitel answered 6/8, 2016 at 6:20 Comment(3)
The problem with stdout might be in the child process. If the stdout buffer is not flushed then python will never receive the contents (and that would be the same whatever language you used). One possible solution (untested) would be in subprocess.Popen to assign the child's stdout to stderr. Usually stderr is unbuffered.Race
Yes, readline will block, waiting to receive the next line, as will anything else that reads sys.stdin. You can tell Python to make sys.stdout unbuffered by specifying the -u option on the command line.Oleander
No one cares if you switch to node.js...Donothing
D
3

I would use asyncio for this kind of task.

Read IO from the process like in this accepted anwser: How to stream stdout/stderr from a child process using asyncio, and obtain its exit code after?

(I don't want to fully copy it here)

Wrap it in a timeout:

async def killer(trans, timeout):
    await asyncio.sleep(timeout)
    trans.kill()
    print ('killed!!')

trans, *other_stuff = loop.run_until_complete(
                           loop.subprocess_exec(
                                SubprocessProtocol, 'py', '-3', '-c', 'import time; time.sleep(6); print("Yay!")' ,
                                        ) 
                       )

asyncio.ensure_future(killer(trans, 5)) # 5 seconds timeout for the kill
loop.run_forever()

Have fun ...

Diggs answered 10/8, 2016 at 9:8 Comment(3)
Unfortunately, I wasn't able to make the process time out after the 5 seconds you've mentioned.Keitel
@NaphtaliGilead - The process does not timeout, But your call will return and then you can kill itDiggs
Updated to kill the process after 5 secondsDiggs
S
0

Use the 2 python script below.

  • The Master.py will use Popen to start a new process and will start a watcher thread that will kill the process after 3.0 seconds.

  • The slave must call the flush method if no newline in the data written to the stdout, (on windows the '\n' also cause a flush).

Be careful the time module is not a high precision timer.

The load time of the process can be longer than 3.0 seconds in extreme cases (reading an executable from a flash drive having USB 1.0)

Master.py

import subprocess, threading, time

def watcher(proc, delay):
    time.sleep(delay)
    proc.kill()

proc = subprocess.Popen('python Slave.py', stdout = subprocess.PIPE)
threading.Thread(target = watcher, args = (proc, 3.0)).start()

data = bytearray()

while proc:
    chunk = proc.stdout.read(1)
    if not chunk:
        break
    data.extend(chunk)
    print(data)

Slave.py

import time, sys

while True:
    time.sleep(0.1)
    sys.stdout.write('aaaa')
    sys.stdout.flush()
Shoeshine answered 10/8, 2016 at 18:10 Comment(2)
Your example works, but I am not able to run my process and get the output.Keitel
use full path to your executable. example: c:\windows\system32\py.exe d:\Slave.pyShoeshine
N
0

On Python 3.7+, use subprocess.run() with capture_output=True and timeout=<your_timeout>. If the command doesn't return before <your_timetout> seconds pass, it will kill the process and raise a subprocess.TimeoutExpired exception, which will have .stdout and .stderr properties:

import subprocess

try:
    result = subprocess.run(["sleep", "3"], timeout=2, capture_output=True)
except subprocess.TimeoutExpired as e:
    print("process timed out")
    print(e.stdout)
    print(e.stderr)

You might also want to pass text=True (or universal_newlines=True on Python <3.7) so that stdout and stderr are strs instead of bytes.

On older versions of Python you need to replace capture_output=True with stdout=subprocess.PIPE, stderr=subprocess.PIPE, in your call to subprocess.run() and the rest should be the same.

Edit: this isn't what you wanted because you need to wait for the process to terminate to read the output, but this is what I wanted when I came upon this question.

Nolie answered 14/10, 2020 at 22:12 Comment(0)
O
0

I just published this article with this example of a test app runner that waits on the app's output (regardless of whether that output is on stdout or stderr) and raises an exception on timeout.

An example test app:

# app/test_app.py

import os
import sys
import time

APP_READY_STRING = os.environ.get("APP_READY_STRING", "No 'App Ready' string provided")
POST_STARTUP_RUN_TIME = 5
STARTUP_DELAY = 5


def main():
    # log to stdout and stderr in order to test that both can be captured
    print("Hello World!")
    print("And hello again to you, sir!", file=sys.stderr)
    # simulate startup delay
    for i in range(STARTUP_DELAY):
        print(f"{time.strftime('%H:%M:%S')} Test app waiting... {i+1}")
        time.sleep(1)
    # print out the string that's being tested for. it should not matter whether
    # this is printed to stdout or stderr
    print(APP_READY_STRING, flush=True, file=sys.stderr)
    # the app should run for 5 seconds before exiting, this will give enough time
    # to test that killing the app works
    time.sleep(POST_STARTUP_RUN_TIME)
    print("Goodbye World!")

if __name__ == "__main__":
    main()

The app_runner module:

# app_runner/app_runner.py

import os
import subprocess
import sys

from .processes import kill_process, wait_for_process_output

APP_READY_STRING = "App started successfully..."


class AppRunner(object):
    def __init__(self, app: str, cwd: str):
        """This class is used to run an app in a separate process.

        Args:
            app (str): The name of the app to run.
            cwd (str): The path where the app is located."""
        self.app = app
        env = {
            **os.environ,
            "APP_READY_STRING": APP_READY_STRING,
        }
        cmd = [
            sys.executable,
            self.app,
        ]
        # start the app in a separate process. it's important that the stdout and
        # stderr streams are captured so that they can be checked for the expected
        # output, and that the app isn't run with a shell
        self.process = subprocess.Popen(
            cmd,
            cwd=cwd,
            env=env,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        try:
            wait_for_process_output(self.process, APP_READY_STRING)
        except Exception as e:
            raise Exception(
                f"AppRunner app '{self.app}' failed to start", e
            )

    def __enter__(self):
        return self

    def __exit__(self, *args):
        kill_process(self.app)

The processes module:

# app_runner/processes.py

import random
import string
import sys
import threading
import time
from typing import List, Union

import psutil

DEFAULT_APP_WAIT_TIME_SEC = 8

# This dictionary is used to store the state of the processes that are being
# searched.
processes = {}


def _is_process_match(command: str, process_names: List[str]) -> bool:
    """Identifying a process by its command line is not an exact science."""
    if len(process_names) == 1:
        command_parts = command.split(" ")
        if command_parts[0] == process_names[0]:
            return True
    if len(process_names) > 1 and all(
        [process_name in command for process_name in process_names]
    ):
        return True
    return False


def _find_in_stream(stream, text: str, process_handle: str) -> None:
    while (
        not processes[process_handle]["text_found"].is_set()
        and not processes[process_handle]["timed_out"].is_set()
    ):
        try:
            line = stream.readline().decode("utf-8")
            if text in line:
                processes[process_handle]["text_found"].set()
            _print_log(line)
        except Exception:
            pass


def _print_log(line: str) -> None:
    line = f"{time.strftime('%H:%M:%S')} {line}\n"
    sys.stderr.write(line)
    sys.stderr.flush()


def _print_process_identifier(proc_name: str, cmd_line: str, process_names: List[str]):
    return f"process '{proc_name}' (looking for {','.join(process_names)}) with command line '{cmd_line}'"


def _process_timeout(process_handle, timeout=DEFAULT_APP_WAIT_TIME_SEC) -> bool:
    _print_log(
        f"Waiting up to {timeout} seconds to abort search on process {process_handle}..."
    )
    timeout_remaining = timeout
    while (
        timeout_remaining > 0 and not processes[process_handle]["text_found"].is_set()
    ):
        time.sleep(1)
        timeout_remaining -= 1
    if not processes[process_handle]["text_found"].is_set():
        processes[process_handle]["timed_out"].set()


def _random_string(length: int) -> str:
    """Naive random string generator to create process identifiers."""
    return "".join(random.choice(string.ascii_lowercase) for _ in range(length))


def wait_for_process_output(
    process, text: str, timeout=DEFAULT_APP_WAIT_TIME_SEC
) -> None:
    """This function checks if the given text is in the process output within the given time limit."""
    start_time = time.time()

    process_handle = _random_string(10)
    processes[process_handle] = {
        "text_found": threading.Event(),
        "timed_out": threading.Event(),
    }

    # start a new thread to stop searching after the timeout
    threading.Thread(target=_process_timeout, args=(process_handle, timeout)).start()
    # search for the text in the stdout and stderr streams
    threading.Thread(
        target=_find_in_stream, args=(process.stdout, text, process_handle)
    ).start()
    threading.Thread(
        target=_find_in_stream, args=(process.stderr, text, process_handle)
    ).start()

    while True:
        if processes[process_handle]["text_found"].is_set():
            return
        if processes[process_handle]["timed_out"].is_set():
            raise Exception(
                f"Failed to find '{text}' in process output after {time.time() - start_time} seconds."
            )


def kill_process(process_names: Union[str, List[str]]) -> None:
    """Kill a Python process identified by the given name or list of names.

    There are easier ways to do this, but this is the most reliable way to kill a
    Python-run process without knowing the exact command line arguments and without
    killing the current process / test runner process (eg. nox).
    """
    if isinstance(process_names, str):
        process_names = [process_names]
    proc_name = "undefined"
    cmd_line = "undefined"
    # Kill all processes with the given name
    for proc in psutil.process_iter(attrs=["pid", "name", "cmdline"], ad_value=None):
        try:
            proc_name = proc.name()
            if proc.status() == psutil.STATUS_ZOMBIE:
                continue
            # Some apps run under their own names, some as `Python` (this also
            # depends on the OS)
            if _is_process_match(proc_name, process_names):
                print(f"Killing process with name {proc_name}...")
                proc.kill()
            elif proc_name.lower().startswith("python"):
                # drop the first argument, which is the python executable
                python_command_parts = proc.cmdline()[1:]
                # the initial command part is the last part of the path
                python_command_parts[0] = python_command_parts[0].split("/")[-1]
                # combine the remaining arguments
                command = " ".join(python_command_parts)
                print(
                    f"Evaluating process with name '{proc_name}' and command '{command}'..."
                )
                if (
                    len(cmd_line) > 1
                    and "nox" not in command # don't kill the test runner process
                    and _is_process_match(command, process_names)
                ):
                    print(
                        f"Killing process with name '{proc_name}' and command '{command}'..."
                    )
                    proc.kill()
        except psutil.ZombieProcess as zp:
            print(
                f"Failed to kill zombie process {_print_process_identifier(proc_name, cmd_line, process_names)}: {str(zp)}"
            )
        except psutil.NoSuchProcess as nsp:
            print(
                f"Failed to kill process {_print_process_identifier(proc_name, cmd_line, process_names)}: {str(nsp)}"
            )

Putting it all together:

# test_app_runner.py

from pathlib import Path

from app_runner.app_runner import AppRunner


def main():
    print("Starting AppRunner test...")
    app = "test_app.py"
    app_location = Path(__file__).parent / "app"
    with AppRunner(app=app, cwd=app_location) as app_runner:
        print("Test app start detected.")
    print("AppRunner test complete.")

if __name__ == "__main__":
    main()
Occlude answered 23/12, 2023 at 15:30 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.