Live-output / stream from Python subprocess
Asked Answered
S

4

13

I am using Python and it's subprocess library to check output from calls using strace, something in the matter of:

subprocess.check_output(["strace", str(processname)]) 

However, this only gives me the output after the called subprocess already finished, which is very limiting for my use-case.

I need a kind of "stream" or live-output from the process, so I need to read the output while the process is still running instead of only after it finished.

Is there a convenient way to achieve this using the subprocess library? I'm thinking of a kind of poll every x seconds, but did not find any hints regarding on how to implement this in the documentation.

Many thanks in advance.

Scrapbook answered 8/1, 2019 at 11:59 Comment(0)
L
9

Had some problems referencing the selected answer for streaming output from a test runner. The following worked better for me:

import subprocess
from time import sleep

def stream_process(process):
    go = process.poll() is None
    for line in process.stdout:
        print(line)
    return go

process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while stream_process(process):
    sleep(0.1)
Libby answered 6/6, 2020 at 15:24 Comment(1)
Just to say this worked perfectly for our use case, thanks for sharing :)Multiphase
D
18

As of Python 3.2 (when context manager support was added to Popen), I have found this to be the most straightforward way to continuously stream output from a subprocess:

import subprocess


def run(args):
  with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) as process:
    for line in process.stdout:
      print(line.decode('utf8'))
Doubleness answered 23/4, 2021 at 23:13 Comment(2)
works under py3.8.5. Much more elegant than previous solutions - i wonder if there are any nuances here? One disadvantage relative to a previous .poll() based I was using is one can't timeout in all cases (this method must block indefinitely for a line or EOF) ... but still quite elegantVibrate
I would also rstrip like so: line.decode("utf8").rstrip("\n") to not add extra newlines to output. Otherwise this should be the accepted answer.Ovi
L
9

Had some problems referencing the selected answer for streaming output from a test runner. The following worked better for me:

import subprocess
from time import sleep

def stream_process(process):
    go = process.poll() is None
    for line in process.stdout:
        print(line)
    return go

process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while stream_process(process):
    sleep(0.1)
Libby answered 6/6, 2020 at 15:24 Comment(1)
Just to say this worked perfectly for our use case, thanks for sharing :)Multiphase
K
5

According to the documentation:

Popen.poll()

Check if child process has terminated. Set and return returncode attribute.

So based on this you can:

process = subprocess.Popen('your_command_here',stdout=subprocess.PIPE)
while True:
    output = process.stdout.readline()
    if process.poll() is not None and output == '':
        break
    if output:
        print (output.strip())
retval = process.poll()

This will loop, reading the stdout, and display the output in real time.


This does not work in current versions of python. (At least) for Python 3.8.5 and newer you should replace output == '' with output == b''

Kakalina answered 8/1, 2019 at 12:25 Comment(6)
Suppose I don't want to simply print: I have a separate thread relying on the data that gets put out in real time. How would I go about accessing this data as elegantly as possible? Besides that, thank you for your answer :-)Scrapbook
The data which is extracted from the process using the procedure above is meant to be processed in a function running in a parallel thread.Scrapbook
So, instead of printing the output variable you will be feeding it into your function. I'd suggest using a Queue - which will contain the output produced by strace - and have your parallel thread consume data from this queue as soon as they're available. Check this out.Kakalina
One more question: What is the retval = process.poll() for?Scrapbook
I'm guessing subprocess changed since you answered b/c I had to change ... and output == '' to ... and output == b'' because process.stdout.readline() is returning byte string. Otherwise the loop never terminates.Disgruntle
@even with the change suggested by Nathan this does not seem to work well. It works for command='pwd' but it already fails for 'pwd; sleep 5 ; pwd' with the error FileNotFoundError: [Errno 2] No such file or directory: 'pwd ; sleep 4 ; pwd'.Pesade
K
0

If you want to treat stdout and stderr separately, you can spawn two threads that handle them concurrently (live as the output is produced).

Adapted from my more detailed answer:

import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen


def stream_command(
    args,
    *,
    stdout_handler=logging.info,
    stderr_handler=logging.error,
    check=True,
    text=True,
    stdout=PIPE,
    stderr=PIPE,
    **kwargs,
):
    """Mimic subprocess.run, while processing the command output in real time."""
    with (
        Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process,
        ThreadPoolExecutor(2) as pool,  # two threads to handle the (live) streams separately
    ):
        exhaust = partial(deque, maxlen=0)  # collections recipe: exhaust an iterable at C-speed
        exhaust_async = partial(pool.submit, exhaust)  # exhaust non-blocking in a background thread
        exhaust_async(stdout_handler(line[:-1]) for line in process.stdout)
        exhaust_async(stderr_handler(line[:-1]) for line in process.stderr)
    retcode = process.poll()  # block until both iterables are exhausted (process finished)
    if check and retcode:
        raise CalledProcessError(retcode, process.args)
    return CompletedProcess(process.args, retcode)

Call with simple print handlers:

stream_command(["echo", "test"], stdout_handler=print, stderr_handler=print)
# test

Or with custom handlers:

outs, errs = [], []
def stdout_handler(line):
    outs.append(line)
    print(line)
def stderr_handler(line):
    errs.append(line)
    print(line)

stream_command(
    ["echo", "test"],
    stdout_handler=stdout_handler,
    stderr_handler=stderr_handler,
)
# test
print(outs)
# ['test']
Keratoid answered 7/7, 2023 at 7:3 Comment(4)
I am curious. Why do you suggest starting a pool of thread workers to push results of the stream handlers into a zero-sized container? Is it only to avoid using for loops, or what am I missing?Flossi
@Flossi the deque with maxlen=0 is a shortcut to exhaust an iterable (see itertools recipes). My answer basically does for _ in iterable: pass inside a thread. This way, stdout_handler and stderr_handler handler get called asynchronously, live as the lines come into the stdout and stderr buffers.Keratoid
if you're OK with redirecting stderr buffer to stdout_handler, you can avoid the ThreadPoolExecutor and use a single for-loop: https://mcmap.net/q/25534/-run-subprocess-and-print-output-to-loggingKeratoid
if you replace the threadpool with two for loops, only the first for loop will be processing its buffer 'live': only when that first buffer is exhausted (subprocess has finished), the second for loop starts reading from its buffer (not 'live').Keratoid

© 2022 - 2024 — McMap. All rights reserved.