How to replicate tee behavior in Python when using subprocess?
Asked Answered
P

9

90

I'm looking for a Python solution that will allow me to save the output of a command in a file without hiding it from the console.

FYI: I'm asking about tee (as the Unix command line utility) and not the function with the same name from Python intertools module.

Details

  • Python solution (not calling tee, it is not available under Windows)
  • I do not need to provide any input to stdin for called process
  • I have no control over the called program. All I know is that it will output something to stdout and stderr and return with an exit code.
  • To work when calling external programs (subprocess)
  • To work for both stderr and stdout
  • Being able to differentiate between stdout and stderr because I may want to display only one of the to the console or I could try to output stderr using a different color - this means that stderr = subprocess.STDOUT will not work.
  • Live output (progressive) - the process can run for a long time, and I'm not able to wait for it to finish.
  • Python 3 compatible code (important)

References

Here are some incomplete solutions I found so far:

Diagram http://blog.i18n.ro/wp-content/uploads/2010/06/Drawing_tee_py.png

Current code (second try)

#!/usr/bin/python
from __future__ import print_function

import sys, os, time, subprocess, io, threading
cmd = "python -E test_output.py"

from threading import Thread
class StreamThread ( Thread ):
    def __init__(self, buffer):
        Thread.__init__(self)
        self.buffer = buffer
    def run ( self ):
        while 1:
            line = self.buffer.readline()
            print(line,end="")
            sys.stdout.flush()
            if line == '':
                break

proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdoutThread = StreamThread(io.TextIOWrapper(proc.stdout))
stderrThread = StreamThread(io.TextIOWrapper(proc.stderr))
stdoutThread.start()
stderrThread.start()
proc.communicate()
stdoutThread.join()
stderrThread.join()

print("--done--")

#### test_output.py ####

#!/usr/bin/python
from __future__ import print_function
import sys, os, time

for i in range(0, 10):
    if i%2:
        print("stderr %s" % i, file=sys.stderr)
    else:
        print("stdout %s" % i, file=sys.stdout)
    time.sleep(0.1)
Real output
stderr 1
stdout 0
stderr 3
stdout 2
stderr 5
stdout 4
stderr 7
stdout 6
stderr 9
stdout 8
--done--

Expected output was to have the lines ordered. Remark, modifying the Popen to use only one PIPE is not allowed because in the real life I will want to do different things with stderr and stdout.

Also even in the second case I was not able to obtain real-time like out, in fact all the results were received when the process finished. By default, Popen should use no buffers (bufsize=0).

Phenetidine answered 8/6, 2010 at 11:36 Comment(3)
related: Python subprocess get children's output to file and terminal?Remind
related: Subprocess.Popen: cloning stdout and stderr both to terminal and variablesRemind
Possible duplicate of Python Popen: Write to stdout AND log file simultaneously Voting this way because this is a community wiki :-)Claudication
T
14

If requiring python 3.6 isn't an issue there is now a way of doing this using asyncio. This method allows you to capture stdout and stderr separately but still have both stream to the tty without using threads. Here's a rough outline:

class RunOutput:
    def __init__(self, returncode, stdout, stderr):
        self.returncode = returncode
        self.stdout = stdout
        self.stderr = stderr


async def _read_stream(stream, callback):
    while True:
        line = await stream.readline()
        if line:
            callback(line)
        else:
            break


async def _stream_subprocess(cmd, stdin=None, quiet=False, echo=False) -> RunOutput:
    if isWindows():
        platform_settings = {"env": os.environ}
    else:
        platform_settings = {"executable": "/bin/bash"}
    if echo:
        print(cmd)
    p = await asyncio.create_subprocess_shell(
        cmd,
        stdin=stdin,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
        **platform_settings
    )
    out = []
    err = []

    def tee(line, sink, pipe, label=""):
        line = line.decode("utf-8").rstrip()
        sink.append(line)
        if not quiet:
            print(label, line, file=pipe)

    await asyncio.wait(
        [
            _read_stream(p.stdout, lambda l: tee(l, out, sys.stdout)),
            _read_stream(p.stderr, lambda l: tee(l, err, sys.stderr, label="ERR:")),
        ]
    )

    return RunOutput(await p.wait(), out, err)


def run(cmd, stdin=None, quiet=False, echo=False) -> RunOutput:
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(
        _stream_subprocess(cmd, stdin=stdin, quiet=quiet, echo=echo)
    )

    return result

The code above was based on this blog post: https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/

Tautonym answered 8/6, 2010 at 11:36 Comment(1)
I got TypeError: Passing coroutines is forbidden, use tasks explicitly. on await asyncio.wait when I ran this with Python 3.11.6.Galleass
M
12

I see that this is a rather old post but just in case someone is still searching for a way to do this:

proc = subprocess.Popen(["ping", "localhost"], 
                        stdout=subprocess.PIPE, 
                        stderr=subprocess.PIPE)

with open("logfile.txt", "w") as log_file:
  while proc.poll() is None:
     line = proc.stderr.readline()
     if line:
        print "err: " + line.strip()
        log_file.write(line)
     line = proc.stdout.readline()
     if line:
        print "out: " + line.strip()
        log_file.write(line)
Mullet answered 8/6, 2010 at 11:36 Comment(7)
This worked for me, though I found stdout, stderr = proc.communicate() easier to use.Mccahill
-1: This solution leads to a deadlock for any subprocess that can generate enough output on stdout or stderr and where stdout/stderr are not perfectly in sync.Remind
@J.F.Sebastian: True, but you can workaround that problem by replacing readline() with readline(size). I have done something similar in other languages. Ref: docs.python.org/3/library/io.html#io.TextIOBase.readlineAvocet
@Avocet wrong. readline(size) won't fix the deadlock. stdout/stderr should be read concurrently. See links under the question that show solutions using threads or asyncio.Remind
@J.F.SebastianDoes this problem exist if I'm only interested in reading one of the streams?Buchheim
@ThorSummoner: naturally, there is no issue if only one stream is redirected to a pipe.Remind
Is this really guaranteed not to miss any piped stdout? Let's say proc produces two final lines to stdout within the time frame of two subsequent proc.poll() calls: 1. proc.poll() == None -> read single line -> one more line exists in stdout but process is finished -> 2. proc.poll() == returncode and the while loop breaks (while there is still remaining lines in stdout). Also, consider setting stderr to subprocess.STDOUT to avoid deadlocks.Faceless
H
8

This is a straightforward port of tee(1) to Python.

import sys

sinks = sys.argv[1:]
sinks = [open(sink, "w") for sink in sinks]
sinks.append(sys.stderr)
while True:
    input = sys.stdin.read(1024)
    if input:
        for sink in sinks:
            sink.write(input)
    else:
        break

I'm running on Linux right now but this ought to work on most platforms.


Now for the subprocess part, I don't know how you want to 'wire' the subprocess's stdin, stdout and stderr to your stdin, stdout, stderr and file sinks, but I know you can do this:

import subprocess

callee = subprocess.Popen(
    ["python", "-i"],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

Now you can access callee.stdin, callee.stdout and callee.stderr like normal files, enabling the above "solution" to work. If you want to get the callee.returncode, you'll need to make an extra call to callee.poll().

Be careful with writing to callee.stdin: if the process has exited when you do that, an error may be rised (on Linux, I get IOError: [Errno 32] Broken pipe).

Hailey answered 8/6, 2010 at 13:9 Comment(7)
This is suboptimal in Linux, since Linux provides an ad-hoc tee(f_in, f_out, len, flags) API, but that's not the point right?Hailey
I updated the question, the problem is that I was not able to find how to use subprocess in order to get the data from the two pipes gradually and not all at once at the end of the process.Phenetidine
I know that your code should work but there is a small requirement that does break the entire logic: I want to be able to distinguish between stdout and stderr and this means that I have to read from both of them but I do not know which will get new data. Please take a look at the example code.Phenetidine
@Sorin, that means you'll have to either use two threads. One reads on stdout, one reads on stderr. If you are going to write both to the same file, you can acquire a lock on the sinks when you start reading and release it after writing a line terminator. :/Hailey
Using threads for this does not sounds too appealing to me, maybe we'll find something else. It's strange that this is a common issue but nobody provided a complete solution for it.Phenetidine
@Hailey I tried the threads a approach but it doesn't work. I updates the question to include the new example.Phenetidine
@Sorin The output you have posted is ordered. You had line1 line3 line5 line7 line9 on stderr, line0 line2 line4 line6 line8 on stdout. Sure, in that run the stderr thread happened to get output first, which meant you had line1 line0 line3 line2 line5 line4... instead of line0 line1 line2 line3 line4 line5... -- but you didn't get line0 line3 line5 line1 line2... or line4 line2 line1 line0 line6... or line0 liline1 line3 linne2 line3e5.... I'm afraid that for a program that has to accept arbitrary input this kind of nondeterminism is unaivoidable if not even necessary.Hailey
O
6

This is how it can be done

import sys
from subprocess import Popen, PIPE

with open('log.log', 'w') as log:
    proc = Popen(["ping", "google.com"], stdout=PIPE, encoding='utf-8')
    while proc.poll() is None:
        text = proc.stdout.readline() 
        log.write(text)
        sys.stdout.write(text)
Ophthalmic answered 8/6, 2010 at 11:36 Comment(3)
For anyone who's wondering, YES you can use print() instead of sys.stdout.write(). :-)Spokesman
@Spokesman print will add an extra newline which is not what you want when you need to faithfully reproduce the output.Vitreous
Yes but print(line, end='') could solve the issueOphthalmic
B
3

Based on the community wiki answer, here is an updated version.

  • Added types
  • Use gather instead of wait (wait gives a warning)
  • Don't unnecessarily decode to str
  • Add timeout.

This is a complete file that you can run; the timeout is set to 5 seconds so it should time out.

NOTE: Python buffers stdout by default so you need to use -u everywhere.

#!/usr/bin/env -S python3 -u

import asyncio
from typing import BinaryIO, Callable, Union
import sys

class RunOutput:
    def __init__(self, exit_code: int, stdout: list[bytes], stderr: list[bytes]):
        self.exit_code = exit_code
        self.stdout = stdout
        self.stderr = stderr


async def _read_stream(stream: asyncio.StreamReader, callback: Callable[[bytes], None]):
    while True:
        line = await stream.readline()
        if len(line) == 0:
            break
        callback(line)


async def _stream_subprocess(command: list[str]) -> RunOutput:
    p = await asyncio.create_subprocess_exec(
        *command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    stdout: list[bytes] = []
    stderr: list[bytes] = []

    def tee(line: bytes, sink: list[bytes], out: BinaryIO):
        sink.append(line)
        out.write(line)

    assert p.stdout is not None
    assert p.stderr is not None

    await asyncio.gather(
        _read_stream(p.stdout, lambda l: tee(l, stdout, sys.stdout.buffer)),
        _read_stream(p.stderr, lambda l: tee(l, stderr, sys.stderr.buffer)),
    )

    exit_code = await p.wait()

    return RunOutput(exit_code, stdout, stderr)


def run(command: list[str], timeout: Union[int, float, None]) -> RunOutput:
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(
        asyncio.wait_for(_stream_subprocess(command), timeout)
    )


def main():
    if "--count" in sys.argv:
        import time

        for i in range(10):
            print(f"A stdout {i}")
            print(f"B stderr {i}", file=sys.stderr)
            time.sleep(1)
            print(f"C stderr {i}", file=sys.stderr)
            print(f"D stdout {i}")
            time.sleep(1)
    else:
        run(["python3", "-u", __file__, "--", "--count"], 5)

if __name__ == "__main__":
    main()
Bunk answered 8/6, 2010 at 11:36 Comment(0)
B
1

Starting with a simple example using tee (I'll show you can do this without tee later) you can do the following:

def tee(command, **kwargs):
    p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs)
    t_out = subprocess.Popen(['tee', '-a', '/dev/stderr'], stdin=p.stdout, stderr=subprocess.PIPE, text=True)
    t_err = subprocess.Popen(['tee', '-a', '/dev/stderr'], stdin=p.stderr, stdout=subprocess.PIPE, text=True)
    return p, t_out, t_err

Here:

  1. Start our command in a subprocess, p, capturing both stderr and stdout.
  2. We start another subprocess, t_out, running tee, and we capture only stderr (allowing stdout to flow out from tee normally)
  3. We do the same for the subprocess t_err, but sending stderr from p and capturing only stdout (allowing stderr to flow normally to stderr)

The end result is that stdout and stderr of your command are output to the terminal normally and also captured in the returned subprocesses.

Suppose a simple program that writes to stderr and stdout:

# test.py
import sys, time
for i in range(10):
    if i % 2 == 0:
        print(i, file=sys.stderr, flush=True)
    else:
        print(i, flush=True)
    time.sleep(0.1)

You could do:

print('starting')
process, t_out, t_err = tee([sys.executable, 'test.py'])
while process.poll() is None:
    time.sleep(0.1)  # wait for process to finish
print('done')
print('stdout:', t_out.stderr.read())
print('stderr:', t_err.stdout.read())

In addition to the program's output, you can see that stdout and stderr are readable by the Python script:

starting
0
1
2
3
4
5
6
7
8
9
done
stdout: 1
3
5
7
9

stderr: 0
2
4
6
8

Without using tee

Note that use of the program tee is not actually necessary. This could just as easily be a pure python program that reads stdin and tees the output the same as tee does.

For example, the following Python script can be used instead. All it does is read stdin and print it to stdout and stderr.

# tee.py
import sys
for line in sys.stdin:
    print(line, file=sys.stdout, flush=True, end='')
    print(line, file=sys.stderr, flush=True, end='')

Then the first example can be modified like this:

def tee(command, **kwargs):
    p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs)
    t_out = subprocess.Popen([sys.executable, 'tee.py'], stdin=p.stdout, stderr=subprocess.PIPE, text=True)
    t_err = subprocess.Popen([sys.executable, 'tee.py'], stdin=p.stderr, stdout=subprocess.PIPE, text=True)
    return p, t_out, t_err

The end result is similar to the first example, but doesn't require the program tee.


This solution also doesn't necessarily require the use of the additional subprocesses. It's just one way to do it. The same solution could be done in two threads that consume stderr/stdout of the first subprocess.

CAVEAT: some substantial changes would be needed to guarantee correct order of arrival of messages to the terminal if stdout and stderr are written to at about the same time.

Broussard answered 8/6, 2010 at 11:36 Comment(0)
S
0

If you don't want to interact with the process you can use the subprocess module just fine.

Example:

tester.py

import os
import sys

for file in os.listdir('.'):
    print file

sys.stderr.write("Oh noes, a shrubbery!")
sys.stderr.flush()
sys.stderr.close()

testing.py

import subprocess

p = subprocess.Popen(['python', 'tester.py'], stdout=subprocess.PIPE,
                     stdin=subprocess.PIPE, stderr=subprocess.PIPE)

stdout, stderr = p.communicate()
print stdout, stderr

In your situation you can simply write stdout/stderr to a file first. You can send arguments to your process with communicate as well, though I wasn't able to figure out how to continually interact with the subprocess.

Sayre answered 8/6, 2010 at 13:36 Comment(3)
This doesn't show you error messages in STDERR in context of STDOUT, which can make debugging shell-scripts etc nearly impossible.Defrock
Meaning...? In this script anything delivered through STDERR is printed to the screen along with STDOUT. If you're referring to return codes, just use p.poll() to retrieve them.Sayre
This doesn't satisfy the "progressive" condition.Vitreous
P
-1

On Linux, if you really need something like the tee(2) syscall, you can get it like this:

import os
import ctypes

ld = ctypes.CDLL(None, use_errno=True)

SPLICE_F_NONBLOCK = 0x02


def tee(fd_in, fd_out, length, flags=SPLICE_F_NONBLOCK):
    result = ld.tee(
        ctypes.c_int(fd_in),
        ctypes.c_int(fd_out),
        ctypes.c_size_t(length),
        ctypes.c_uint(flags),
    )

    if result == -1:
        errno = ctypes.get_errno()
        raise OSError(errno, os.strerror(errno))

    return result

To use this, you probably want to use Python 3.10 and something with os.splice (or use ctypes in the same way to get splice). See the tee(2) man page for an example.

Precritical answered 8/6, 2010 at 11:36 Comment(0)
O
-2

My solution isn't elegant, but it works.

You can use powershell to gain access to "tee" under WinOS.

import subprocess
import sys

cmd = ['powershell', 'ping', 'google.com', '|', 'tee', '-a', 'log.txt']

if 'darwin' in sys.platform:
    cmd.remove('powershell')

p = subprocess.Popen(cmd)
p.wait()
Ophthalmic answered 8/6, 2010 at 11:36 Comment(1)
Gives an invalid command line error message from ping in MacOS.Vitreous

© 2022 - 2024 — McMap. All rights reserved.