A non-blocking read on a subprocess.PIPE in Python
Asked Answered
L

32

630

I'm using the subprocess module to start a subprocess and connect to its output stream (standard output). I want to be able to execute non-blocking reads on its standard output. Is there a way to make .readline non-blocking or to check if there is data on the stream before I invoke .readline? I'd like this to be portable or at least work under Windows and Linux.

Here is how I do it for now (it's blocking on the .readline if no data is available):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Langtry answered 17/12, 2008 at 17:56 Comment(7)
(Coming from google?) all PIPEs will deadlock when one of the PIPEs' buffer gets filled up and not read. e.g. stdout deadlock when stderr is filled. Never pass a PIPE you don't intend read.Gilgilba
@NasserAl-Wohaibi does this mean its better to always create files then?Sackville
something I've been curious to understand is why its blocking in the first place...I'm asking because I've seen the comment: To avoid deadlocks: careful to: add \n to output, flush output, use readline() rather than read()Sackville
It is, "by design", waiting to receive inputs.Lime
related: https://mcmap.net/q/25511/-interactive-input-output-using-python/240515Hesperus
Unbelievable that 12 years on this isn't part of python itself :(Foliated
FWIW, Popen.communicate does this with a version of select: github.com/python/cpython/blob/…Victor
W
495

fcntl, select, asyncproc won't help in this case.

A reliable way to read a stream without blocking regardless of operating system is to use Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line
Wast answered 4/2, 2011 at 9:14 Comment(47)
Yes this works for me, I removed a lot though. It includes good practices but not always necessary. Python 3.x 2.X compat and close_fds may be omitted, it will still work. But just be aware of what everything does and don't copy it blindly, even if it just works! (Actually the simplest solution is to use a thread and do a readline as Seb did, Qeues are just an easy way to get the data, there are others, threads are the answer!)Lashkar
Inside the thread, the call to out.readline blocks the thread, and main thread, and I have to wait until readline returns before everything else continues. Any easy way around that? (I'm reading multiple lines from my process, which is also another .py file that's doing DB and things)Kneehigh
@Justin: 'out.readline' doesn't block the main thread it is executed in another thread.Wast
close_fds is definitely not something you'd want to copy blindly into your application...Adrell
what if I fail to shut down the subprocess, eg. due to exceptions? the stdout-reader thread won't die and python will hang, even if the main thread exited, isn't it? how could one work around this? python 2.x doesn't support killing the threads, what's worse, doesn't support interrupting them. :( (obviously one should handle the exceptions to assure the subprocess is shut down, but just in case it won't, what can you do?)Ramonaramonda
@naxa: notice daemon=True: python process won't hang if the main thread is exited.Wast
I've created some friendly wrappers of this in the package shelljob pypi.python.org/pypi/shelljobPathoneurosis
I started the Popen in a separate thread, so I can 'busy wait' within this thread, using: .... t.start() while p.poll() == None: time.sleep(0.1) In this case my GUI will not block (I'm using TKinter). So I can also use parent.after(100, self.consume) to simulate an event kind of polling. In the consume method I eventually use the q.get() method to retrieve data from the queue. works like a charm! Although some people say that you can a have dead-lock using wait() or poll() in combination with stdout PIPE???Divisor
@danger89: yes. You can deadlock if your parent is blocked on .wait() but your child waits for you to read its output (not your case but q.get() in GUI callback is also incorrect). The simplest option is to use .communicate() in a separate thread. Here's a couple of code example that shows how you could read output from a subprocess without "locking" GUI: 1. with threads 2. no threads (POSIX). If something is unclear, askWast
what is the purpose of close_fds here?Ammerman
@dashesy: to avoid leaking parent's file descriptors. The demonstrated behavior is the default on Python 3.Wast
@J.F.Sebastian: makes sense. This is the only solution that does not block it seems on 2.7 there is no other way (tried select, fcntl)! Any idea why I cannot get the output from a sgid process? It works just fine in terminal but here I get no output yet which cannot be.Ammerman
@dashesy: select, fcntl should work on POSIX systems. If you don't understand why "no output yet" is always a possibility for the given solution; ask a new question. Make sure to read How to Ask and stackoverflow.com/help/mcveWast
@J.F.Sebastian sorry for my desperate attempt, I thought you might know the answer already, you look like you do :) I will ask a new question next time. BTW, after use setbuf(stdout, NULL) in the executable it worked (most worked even select and fcntl) like charm, I have no idea why \n did not flush stdout but I will not be surprised if it has something to do with it being suid and selinux.Ammerman
@dashesy: if the subprocess' stdout is redirected to a pipe then it is block-buffered (it is line-buffered if stdout is a terminal (tty)) for stdio-based programs written in C. See Python C program subprocess hangs at “for line in iter”Wast
@J.F.Sebastian did not know about this, naively I always assumed the behavior as seen in tty, but now it makes sense. So it means smart applications should look at the type of stdout, if they produce things that are lines and could be used in pipes/grep.Ammerman
any reason you're doing out.close() in enqueue_output? shouldnt that be the job of the Popen object?Hedva
@zaphod: it is done for the same reason with-statement is used for ordinary files: to avoid relying on hard-to-reason-about garbage collection to free the resources i.e., even if p.stdout is closed when Popen() is garbage-collected: I prefer the explicit simple and deterministic out.close() (though I should've used with out: in the thread instead -- it seems to work on Python 2.7 and Python 3).Wast
This solution will not for on may codes systems(16-48 core systems). GIL is come to play in context switching.Romilda
Better use nonblocking IO as described lower.Romilda
@AntonMedvedev: 1. it doesn't matter how many CPUs there are. The question is about I/O (Python releases GIL during blocking I/O operations). 2. There was no alternative to threads in stdlib for a portable solution at the time of the answer. And it is debatable whether nonblocking IO is better (unconditionally) than threads. A sensible approach is to research the tradeoffs in your particular case.Wast
you write ' Windows and Linux', does this exclude OSX? I just tried it on OSX with executing ffmpeg which seems to have problems. I am going to test this more in detail, except if you tell me that this cannot work in OSX.Elias
@P.R.: OP asks about these OSes that is why they are mentioned explicitly. It should work on OS X too.Wast
@nights: "Doesnt work for me" is not very informative. Create a minimum code example, describe using words what do you expect to get and what do you get instead step by step, what is you OS, Python version and post it as a separate question.Wast
Do not switch out threading for multiprocessing and multiprocessing.Queue with this answer. Termination of the subprocess caused the stdout cursor to move to the start of the screen for me.Michalmichalak
@Wast is non-blocking I/O must be asynchronous?Lisettelisha
@Lisettelisha I'm not sure what you are asking. These concepts are closely related. The answer shows how to implement a non-blocking read on top of the synchronous blocking readline() call. The interface is also asynchronous (other things may happen while IO is performed).Wast
I feel like I stumble over this thread once a month or so... I still have no idea why the claim "fcntl wont help in this case". I must have implemented this 10-20 times by now using fnctl to set os.O_NONBLOCK (with os.read(), and not readline()). It generally seems to work as expected.Launder
@Launder does Windows support fcntl? Does readline() on Python 2 support non-blocking mode?Wast
@Wast not sure about windows... do you know? is that why you said it wont help? Also note that I said with os.read() and not readline().Launder
os.read() is available on Windows, but os.O_NONBLOCK is documented as being Unix specific, so I would not expect it to work there.Cavafy
When I tried with the provided logic in answer the stderr data am unable to get itAlesiaalessandra
Looks like the stderr data also going to stdoutAlesiaalessandra
@Alesiaalessandra no, stdout in the answer goes to the pipe. stderr does not go to the pipe here. Your code is another matter.Wast
is there a reason why docs.python.org/3/library/asyncio-api-index.html library is not mentioned or used? Is it not good for this?Sackville
@CharlieParker scroll down to my other answerWast
Don't know about earlier python versions, but with 3.7 and 3.8, this method is lossy. Run a couple of times something like 'cat /var/log/bigfile' with shell=True and sometimes you get the whole file output, sometimes you get partial results.Marrilee
@Orsiris de Jong what specific code did you run? it can end the reading prematurely (there is no condition on when to stop in the answer). Have you tested 3.9? (perhaps, the code fails regardless Python version)Wast
@Wast Thanks for the fast answer. Here's the simplest example I came up with. gist.github.com/deajan/7f4d500883a26eed2acf9f6ece60deef This was done under Windows, but basically Linux result is the same. Also line buffering doesn't isn't supported on binary streams anymore, perhaps that's the reason (eg C:\Python38-64\lib\subprocess.py:844: RuntimeWarning: line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used self.stdout = io.open(c2pread, 'rb', bufsize))Marrilee
@Orsiris de Jong I would start with a proper clean up: process ended/pipe is closed, queue is exhaustedWast
@Wast I did try another queue read after the while has finished. In the end. Also tried a last read after process has finished. Never got a working solution. Is there a chance you can modify my gist to make it work ? So far, the only real reliable way I found was using process.communicate() with a thread.Marrilee
@OrsirisdeJong: a simple change I'd try is to put None after the out pipe is closed in enqueue_output(). Then you could read until you get None from the queue.Wast
@Wast Using None as EOF character for the queue works unless Popen argument encoding is used, then we have an infinite iterator since we don't use binary reads. I've updated my gist with a working example which is tested on Python versions 2.7 to 3.9 on Windows & Linux, see gist.github.com/deajan/…. Maybe your answer could include the text/bytes stuff and None after out since those are headache savers ;)Marrilee
End result is visible in python package command_runner at github.com/netinvent/command_runner where I use two different methods to get subprocess.Popen output in a non blocking way, one being based on the answer above, both methods fully working and tested on MSWin & Linux and multiple python versions. @jfs, Thanks for your time.Marrilee
@OrsirisdeJong: note: I've suggested None as a debugging tool (if it works, then it means the non-None variant doesn't do a proper cleanup). I'm not sure there exists readable, easy to understand/adapt and at the same time general approach (though it should be easy in each specific case). [I don't know whether I find focus long enough to attempt updating the answer for the latest Python 3 and/or general case cleanup.]Wast
t.daemon = True saved my life on a related problem. Don't knock "overcomplete" solutionsColyer
I wrapped this for streams in general in a purpose-built package at github.com/xloem/nonblocking_stream_queue . pip install nonblocking-stream-queue. I had not yet noticed that shelljob also wraps it, although shelljob requires a subprocess where my wrap focuses on streams. This approach also does not use a daemon thread, which throws errors when standard streams are used, instead terminating when the parent thread terminates. There is also a quick blocking function provided to wait for data availability.Distaff
M
91

I have often had a similar problem; Python programs I write frequently need to have the ability to execute some primary functionality while simultaneously accepting user input from the command line (stdin). Simply putting the user input handling functionality in another thread doesn't solve the problem because readline() blocks and has no timeout. If the primary functionality is complete and there is no longer any need to wait for further user input I typically want my program to exit, but it can't because readline() is still blocking in the other thread waiting for a line. A solution I have found to this problem is to make stdin a non-blocking file using the fcntl module:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

In my opinion this is a bit cleaner than using the select or signal modules to solve this problem but then again it only works on UNIX...

Minuteman answered 27/11, 2009 at 21:33 Comment(9)
According to the docs, fcntl() can receive either a file descriptor, or an object that has .fileno() method.Murrey
Jesse's answer is not correct. According to Guido, readline doesn't work correctly with non-blocking mode, and it won't before Python 3000. bugs.python.org/issue1175#msg56041 If you want to use fcntl to set the file to non-blocking mode, you have to use the lower-level os.read() and separate out the lines yourself. Mixing fcntl with high-level calls that perform line buffering is asking for trouble.Pockmark
The use of readline seems incorrect in Python 2. See anonnn's answer #375927Primate
Please, don't use busy loops. Use poll() with a timeout to wait for the data.Stag
@Stefano what's buffer_size defined as?Cassilda
@Cassilda : your choice, I usually have 1024 - it's the amount of bytes to be read in one go so set it smaller or larger according to your expected data size!Ulcer
@Ulcer Yes, I hadn't realised it could be any arbitrary literalCassilda
is there a reason why docs.python.org/3/library/asyncio-api-index.html library is not mentioned or used? Is it not good for this?Sackville
Just change readline() to read, then add some logic to split and yield. It works great!Banshee
D
72

On Unix-like systems and Python 3.5+ there's os.set_blocking which does exactly what it says.

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

This outputs:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

With os.set_blocking commented it's:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''
Despoil answered 11/12, 2019 at 17:51 Comment(7)
This is by far the most elegant solution, thanks for making my day (night actually ^^)Verdieverdigris
Very elegant, and very efficient. Thanks for this solution, it works perfectly!Aletheaalethia
Thank you! This works great when using the Popen pipes with a Selector, to make sure it won't ever block.Spiderwort
Elegant, yes. But it's not multiplatform (per the question).Malnutrition
Weird, I get all b'' (all empty strings) for some reason. Ubuntu 20.04, Python3.10.Chasten
Does readline() guarantee the support for non-blocking underlying file descriptor? (it didn't in Python 2)Wast
Windows users have to update their Python to 3.12, lower version on Windows does not provide this. My conda update only give me Python 3.11.5, so sad.Shanika
W
50

Python 3.4 introduces new provisional API for asynchronous IO -- asyncio module.

The approach is similar to twisted-based answer by @Bryan Ward -- define a protocol and its methods are called as soon as data is ready:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

See "Subprocess" in the docs.

There is a high-level interface asyncio.create_subprocess_exec() that returns Process objects that allows to read a line asynchroniosly using StreamReader.readline() coroutine (with async/await Python 3.5+ syntax):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() performs the following tasks:

  • start subprocess, redirect its stdout to a pipe
  • read a line from subprocess' stdout asynchronously
  • kill subprocess
  • wait for it to exit

Each step could be limited by timeout seconds if necessary.

Wast answered 20/12, 2013 at 5:50 Comment(5)
When I try something like this using python 3.4 coroutines, I only get output once the entire script has run. I'd like to see a line of output printed, as soon as the subprocess prints a line. Here's what I've got: pastebin.com/qPssFGep.Idellaidelle
@flutefreak7: buffering issues are unrelated to the current question. Follow the link for possible solutions.Wast
thanks! Solved the problem for my script by simply using print(text, flush=True) so that the printed text would be immediately available to the watcher calling readline. When I tested it with the Fortran-based executable I actually want to wrap/watch, it doesn't buffer it's output, so it behaves as expected.Idellaidelle
Is it possible to allow the subprocess to persist and perform further read/write operations. readline_and_kill, in your second script, works very much like subprocess.comunicate in that it terminates the process after one read/write operation. I also see that you're using a single pipe, stdout, which subprocess handles as non-blocking. Trying to use both stdout and stderr I find I end up blocking.Beaird
@Beaird the code in the answer works as intended as described in the answer explicitly. It is possible to implement other behavior if desired. Both pipes are equally nonblocking if used, here's an example how to read from both pipes concurrently.Wast
D
19

Try the asyncproc module. For example:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

The module takes care of all the threading as suggested by S.Lott.

Donahue answered 13/1, 2009 at 3:36 Comment(7)
Absolutely brilliant. Much easier than the raw subprocess module. Works perfectly for me on Ubuntu.Bedspring
asyncproc doesn't work on windows, and windows doesn't support os.WNOHANG :-(Warship
asyncproc is GPL, which further limits its use :-(Warship
Thanks. One small thing: It seems that replacing tabs with 8 spaces in asyncproc.py is the way to go :)Intosh
It doesn't look like you can get the return code of the process that you launched though via asyncproc module; only the output that it generated.Barque
"asyncproc is GPL, which further limits its use :-(", are you proprietary software supporter? @BryanOakleyLaticialaticiferous
@bsound: Yes, companies should be able to write proprietary software. But regardless of my own personal beliefs, it is a fact that GPL limits the use of software under that license. Many companies don't allow the use of GPL software due to its viral nature. By definition that means there are limits to how GPL software can be used. That's not a political statement, that's just a fact.Warship
A
17

You can do this really easily in Twisted. Depending upon your existing code base, this might not be that easy to use, but if you are building a twisted application, then things like this become almost trivial. You create a ProcessProtocol class, and override the outReceived() method. Twisted (depending upon the reactor used) is usually just a big select() loop with callbacks installed to handle data from different file descriptors (often network sockets). So the outReceived() method is simply installing a callback for handling data coming from STDOUT. A simple example demonstrating this behavior is as follows:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

The Twisted documentation has some good information on this.

If you build your entire application around Twisted, it makes asynchronous communication with other processes, local or remote, really elegant like this. On the other hand, if your program isn't built on top of Twisted, this isn't really going to be that helpful. Hopefully this can be helpful to other readers, even if it isn't applicable for your particular application.

Attainable answered 21/4, 2011 at 21:54 Comment(4)
no good. select should not work on windows with file descriptors, according to docsRamonaramonda
@naxa I don't think the select() he's referring to is the same one you are. I'm assuming this because Twisted works on windows...Kepner
I've added similar solution based on asyncio from stdlib.Wast
"Twisted (depending upon the reactor used) is usually just a big select() loop" means there are several reactors to choose between. The select() one is the most portable one on unixes and unix-likes, but there are also two reactors available for Windows: twistedmatrix.com/documents/current/core/howto/…Danube
H
17

Things are a lot better in modern Python.

Here's a simple child program, "hello.py":

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

And a program to interact with it:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

That prints out:

b'hello bob\n'
b'hello alice\n'

Note that the actual pattern, which is also by almost all of the previous answers, both here and in related questions, is to set the child's stdout file descriptor to non-blocking and then poll it in some sort of select loop. These days, of course, that loop is provided by asyncio.

Hesperus answered 9/5, 2019 at 2:12 Comment(4)
imo this is the best answer, it actually uses Windows overlapped/async read/writes under the hood (versus some variation of threads to handle blocking). Per the docs, you should call drain() to ensure write(..) actually goes throughShoreward
I got error from it: builtins.ValueError: set_wakeup_fd only works in main thread I'm doing this from within a QThread (PyQt5)Judah
Works excellently on Windows. Solves the limitation with PowerShell pipelines that pass along the output only once the source exits, not chunk by chunk.Katalin
Your solution worked for me on Mac using Python 3.11.Spline
C
13

Use select & read(1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

For readline()-like:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a
Criseyde answered 26/1, 2011 at 1:2 Comment(3)
no good. select should not work on windows with file descriptors, according to docsRamonaramonda
OMG. Read megabytes, or possibly gigabytes one character at a time... that is the worst idea I've seen in a long time... needless to mention, this code doesn't work, because proc.stdout.read() no matter how small the argument is is a blocking call.Stamp
OSError: [WinError 10093] Either the application has not called WSAStartup, or WSAStartup failedMonaural
M
9

Here is a simple solution based on threads which:

  • works on both Linux and Windows (not relying on select).
  • reads both stdout and stderr asynchronouly.
  • doesn't rely on active polling with arbitrary waiting time (CPU friendly).
  • doesn't use asyncio (which may conflict with other libraries).
  • runs until the child process terminates.

printer.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()
Mccahill answered 20/3, 2020 at 15:41 Comment(2)
Seems to work great. I've noticed that if using a c++ exe to connect to, I've needed to call fflush(stdout) after any printfs to stdout to get things to work on Windows. Not needed for stderr.Harneen
C++ standard library streams (except stderr) are buffered by default. If you are doing any sort of interactivity (other than console or file) you need to flush to immediatelly see effects on the other side.Altercation
S
8

Here is my code, used to catch every output from subprocess ASAP, including partial lines. It pumps at same time and stdout and stderr in almost correct order.

Tested and correctly worked on Python 2.7 linux & windows.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()
Schoolmistress answered 11/6, 2013 at 19:9 Comment(3)
One of the few answers which allow you to read stuff that does not necessarily end with a newline.Hennahane
While your solution is the closest I get to no missing input, running something like 'cat /some/big/file' hundreds of times in a row with the above code and comparing each output with the last one will show differences and endup with some (rare) times where the whole output couldn't be catched.Marrilee
Hmmm.. Not whole file -- because something at the beginning missing (i.e. it sent data before io.open for it was done), or because something at the end of the file (exit before draining all input)?Schoolmistress
P
7

One solution is to make another process to perform your read of the process, or make a thread of the process with a timeout.

Here's the threaded version of a timeout function:

http://code.activestate.com/recipes/473878/

However, do you need to read the stdout as it's coming in? Another solution may be to dump the output to a file and wait for the process to finish using p.wait().

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()
Petersen answered 17/12, 2008 at 18:16 Comment(1)
seems like recpie's thread would not exit after timeout and killing it depends on being able to kill the subprocess (sg. otherwise unrelated in this regard) it reads (a thing you should be able to but just in case you can't..).Ramonaramonda
C
7

Disclaimer: this works only for tornado

You can do this by setting the fd to be nonblocking and then use ioloop to register callbacks. I have packaged this in an egg called tornado_subprocess and you can install it via PyPI:

easy_install tornado_subprocess

now you can do something like this:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

you can also use it with a RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()
Cascabel answered 6/7, 2012 at 12:42 Comment(5)
Thanks for the nice feature! Just to clarify, why can't we simply use threading.Thread for creating new non-blocking processes? I used it in on_message of Tornado websocket instance, and it did the job fine.Filing
threading is mostly discouraged in tornado. they are fine for small, short running functions. You can read about it here: https://mcmap.net/q/25512/-tornado-web-and-threads github.com/facebook/tornado/wiki/Threading-and-concurrencyCascabel
@VukasinToroman you really saved me here with this. thank you so much for the tornado_subprocess module :)Tawny
does this work on windows? (note that select, with file descriptors, does not)Ramonaramonda
This lib does not use the select call. I haven't tried this under Windows but you would probably run into trouble since the lib is using the fcntl module. So in short: no this probably will not work under Windows.Cascabel
L
7

Existing solutions did not work for me (details below). What finally worked was to implement readline using read(1) (based on this answer). The latter does not block:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(myprocess,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Why existing solutions did not work:

  1. Solutions that require readline (including the Queue based ones) always block. It is difficult (impossible?) to kill the thread that executes readline. It only gets killed when the process that created it finishes, but not when the output-producing process is killed.
  2. Mixing low-level fcntl with high-level readline calls may not work properly as anonnn has pointed out.
  3. Using select.poll() is neat, but doesn't work on Windows according to python docs.
  4. Using third-party libraries seems overkill for this task and adds additional dependencies.
Listing answered 15/3, 2013 at 4:40 Comment(6)
1. q.get_nowait() from my answer must not block, ever, that is the point of using it. 2. The thread that executes readline (enqueue_output() function) exits on EOF e.g., including the case when the output-producing process is killed. If you believe it is not so; please, provide a complete minimal code example that shows otherwise (maybe as a new question).Wast
@sebastian I spent an hour or more trying to come up with a minimal example. In the end I must agree that your answer handles all the cases. I guess it didn't work earlier for me because when I was trying to kill the output-producing process, it was already killed and gave a hard-to-debug error. The hour was well spent, because while coming up with a minimal example, I could come up with a simpler solution.Listing
Could you post the simpler solution, too? :) (if it's different from Sebastian's)Ramonaramonda
@danger89: I think dcmpid = myprocess .Coastland
In condition after read() calling (just after while True): out never will be empty string because you read at least string/bytes with length of 1.Anew
It's quite inefficient to read bytes one by one with large outputs...Cognation
B
6

I add this problem to read some subprocess.Popen stdout. Here is my non blocking read solution:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Bigmouth answered 21/4, 2011 at 20:51 Comment(2)
fcntl doesn't work on windows, according to the docs.Ramonaramonda
@anatolytechtonik use msvcrt.kbhit() insteadCassilda
C
4

This version of non-blocking read doesn't require special modules and will work out-of-the-box on majority of Linux distros.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())
Cassino answered 24/5, 2016 at 15:55 Comment(0)
M
4

Not the first and probably not the last, I have built a package that does non blocking stdout PIPE reads with two different methods, one being based on the work of J.F. Sebastian (@jfs)'s answer, the other being a simple communicate() loop with a thread to check for timeouts.

Both stdout capture methods are tested to work both under Linux and Windows, with Python versions from 2.7 to 3.9 as of the time of writing

Being non blocking, it guarantees timeout enforcement, even with multiple child and grandchild processes, and even under Python 2.7.

The package also handles both bytes and text stdout encodings, being a nightmare when trying to catch EOF.

You'll find the package at https://github.com/netinvent/command_runner

If you need some well tested non blocking read implementations, try it out (or hack the code):

pip install command_runner

from command_runner import command_runner

exit_code, output = command_runner('ping 127.0.0.1', timeout=3)
exit_code, output = command_runner('echo hello world, shell=True)
exit_code, output = command_runner('some command', stdout='some_file')

You can find the core non blocking read code in _poll_process() or _monitor_process() depending on the capture method employed. From there, you can hack your way to what you want, or simply use the whole package to execute your commands as a subprocess replacement.

Marrilee answered 8/9, 2021 at 9:3 Comment(2)
Amazing, this is what I needed! Thank you so much for making this package and it also being public! I looked for hours for a solution like thatMelyndamem
Thanks for the feedback, always good to know when these packages can help someone ;)Marrilee
T
3

I have the original questioner's problem, but did not wish to invoke threads. I mixed Jesse's solution with a direct read() from the pipe, and my own buffer-handler for line reads (however, my sub-process - ping - always wrote full lines < a system page size). I avoid busy-waiting by only reading in a gobject-registered io watch. These days I usually run code within a gobject MainLoop to avoid threads.

def set_up_ping(ip, w):
    # run the sub-process
    # watch the resultant pipe
    p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
    # make stdout a non-blocking file
    fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
    fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
    return stdout_gid # for shutting down

The watcher is

def watch(f, *other):
    print 'reading',f.read()
    return True

And the main program sets up a ping and then calls gobject mail loop.

def main():
    set_up_ping('192.168.1.8', watch)
    # discard gid as unused here
    gobject.MainLoop().run()

Any other work is attached to callbacks in gobject.

Tyra answered 26/9, 2018 at 12:33 Comment(1)
selector is part of python stdlib and can replace gobject easily.Plunge
C
2

The select module helps you determine where the next useful input is.

However, you're almost always happier with separate threads. One does a blocking read the stdin, another does wherever it is you don't want blocked.

Cubic answered 17/12, 2008 at 18:19 Comment(4)
I think this answer is unhelpful for two reasons: (a) The select module will not work on pipes under Windows (as the provided link clearly states), which defeats the OP's intentions to have a portable solution. (b) Asynchronous threads do not allow for a synchronous dialogue between the parent and the child process. What if the parent process wants to dispatch the next action according to the next line read from the child?!Brana
select is also not useful in that Python's reads will block even after the select, because it does not have standard C semantics and will not return partial data.Dronski
A separate thresd for reading from child's output solved my problem which was similar to this. If you need syncronous interaction I guess you can't use this solution (unless you know what output to expect). I would have accepted this answerTana
"Python's reads will block even after the select, because it does not have standard C semantics and will not return partial data" → Not if you use os.read, as e.g. the subprocess module does (for this reason).Brantbrantford
A
2

Adding this answer here since it provides ability to set non-blocking pipes on Windows and Unix.

All the ctypes details are thanks to @techtonik's answer.

There is a slightly modified version to be used both on Unix and Windows systems.

  • Python3 compatible (only minor change needed).
  • Includes posix version, and defines exception to use for either.

This way you can use the same function and exception for Unix and Windows code.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://stackoverflow.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

To avoid reading incomplete data, I ended up writing my own readline generator (which returns the byte string for each line).

Its a generator so you can for example...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)
Antarctica answered 28/1, 2016 at 3:37 Comment(2)
(1) this comment indicates that readline() doesn't work with non-blocking pipes (such as set using fcntl) on Python 2 -- do you think it is no longer correct? (my answer contains the link (fcntl) that provides the same info but it seems deleted now). (2) See how multiprocessing.connection.Pipe uses SetNamedPipeHandleStateWast
I only tested this on Python3. But saw this information too and expect it remains valid. I also wrote my own code to use in-place of readline, I've updated my answer to include it.Antarctica
N
1

why bothering thread&queue? unlike readline(), BufferedReader.read1() wont block waiting for \r\n, it returns ASAP if there is any output coming in.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()
Nalley answered 19/1, 2015 at 7:39 Comment(2)
Will it return ASAP if there is nothing comming in? If it does not it is blocking.Lime
@MathieuPagé is right. read1 will block if the first underlying read blocks, which happens when the pipe is still open but no input is available.Staats
S
1

In my case I needed a logging module that catches the output from the background applications and augments it(adding time-stamps, colors, etc.).

I ended up with a background thread that does the actual I/O. Following code is only for POSIX platforms. I stripped non-essential parts.

If someone is going to use this beast for long runs consider managing open descriptors. In my case it was not a big problem.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)
Sequin answered 1/2, 2015 at 16:32 Comment(0)
S
1

This is a example to run interactive command in subprocess, and the stdout is interactive by using pseudo terminal. You can refer to: https://mcmap.net/q/25514/-run-interactive-bash-with-popen-and-a-dedicated-tty-python

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
Shoveler answered 25/3, 2017 at 3:40 Comment(0)
B
1

My problem is a bit different as I wanted to collect both stdout and stderr from a running process, but ultimately the same since I wanted to render the output in a widget as its generated.

I did not want to resort to many of the proposed workarounds using Queues or additional Threads as they should not be necessary to perform such a common task as running another script and collecting its output.

After reading the proposed solutions and python docs I resolved my issue with the implementation below. Yes it only works for POSIX as I'm using the select function call.

I agree that the docs are confusing and the implementation is awkward for such a common scripting task. I believe that older versions of python have different defaults for Popen and different explanations so that created a lot of confusion. This seems to work well for both Python 2.7.12 and 3.5.2.

The key was to set bufsize=1 for line buffering and then universal_newlines=True to process as a text file instead of a binary which seems to become the default when setting bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG and VERBOSE are simply macros that print output to the terminal.

This solution is IMHO 99.99% effective as it still uses the blocking readline function, so we assume the sub process is nice and outputs complete lines.

I welcome feedback to improve the solution as I am still new to Python.

Billye answered 27/6, 2017 at 21:45 Comment(2)
In this particular case, you can set stderr=subprocess.STDOUT in the Popen constructor, and get all output from cmd.stdout.readline().Janerich
Nice clear example. Was having trouble with select.select() but this resolved it for me.Valdes
M
0

I have created a library based on J. F. Sebastian's solution. You can use it.

https://github.com/cenkalti/what

Melgar answered 27/4, 2013 at 20:14 Comment(0)
E
0

EDIT: This implementation still blocks. Use J.F.Sebastian's answer instead.

I tried the top answer, but the additional risk and maintenance of thread code was worrisome.

Looking through the io module (and being limited to 2.6), I found BufferedReader. This is my threadless, non-blocking solution.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line
Esdras answered 7/5, 2013 at 17:38 Comment(2)
have you tried for line in iter(p.stdout.readline, ""): # do stuff with the line? It is threadless (single thread) and blocks when your code blocks.Wast
@j-f-sebastian Yeah, I eventually reverted to your answer. My implementation still occasionally blocked. I'll edit my answer to warn others not to go down this route.Esdras
P
0

Working from J.F. Sebastian's answer, and several other sources, I've put together a simple subprocess manager. It provides the request non-blocking reading, as well as running several processes in parallel. It doesn't use any OS-specific call (that I'm aware) and thus should work anywhere.

It's available from pypi, so just pip install shelljob. Refer to the project page for examples and full docs.

Pathoneurosis answered 31/10, 2013 at 10:9 Comment(0)
C
0

This solution uses the select module to "read any available data" from an IO stream. This function blocks initially until data is available, but then reads only the data that is available and doesn't block further.

Given the fact that it uses the select module, this only works on Unix.

The code is fully PEP8-compliant.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer
Confutation answered 2/10, 2017 at 20:55 Comment(0)
E
0

I also faced the problem described by Jesse and solved it by using "select" as Bradley, Andy and others did but in a blocking mode to avoid a busy loop. It uses a dummy Pipe as a fake stdin. The select blocks and wait for either stdin or the pipe to be ready. When a key is pressed stdin unblocks the select and the key value can be retrieved with read(1). When a different thread writes to the pipe then the pipe unblocks the select and it can be taken as an indication that the need for stdin is over. Here is some reference code:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()
Entoil answered 7/4, 2018 at 7:54 Comment(1)
NOTE: In order to make this work in Windows the pipe should be replaced by a socket. I didn't try it yet but it should work according to the documentation.Entoil
E
0

Try wexpect, which is the windows alternative of pexpect.

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()
Evannia answered 7/12, 2019 at 14:13 Comment(0)
E
0

I'm gonna let you know, there is one SUPER SIMPLE way to do this. I looked for quite a while, tried some of the complicated queue functions etc, all to stumble upon this comment which explains you just need to specify stdout=sys.stdout in the Popen

import subprocess
import sys

def execute(command):
    subprocess.Popen('myprogram.exe', shell=True, stdout=sys.stdout, stderr=subprocess.STDOUT)
Ebullition answered 26/9, 2023 at 14:20 Comment(0)
L
0

This one is pure Python, easy to understand and works just fine:

import subprocess
import sys
import threading

# Function to read and print stdout of the subprocess
def readstdout():
    for l in iter(p.stdout.readline, b""):
        sys.stdout.write(f'{l.decode("utf-8", "backslashreplace")}\n')

# Function to read and print stderr of the subprocess
def readstderr():
    for l in iter(p.stderr.readline, b""):
        sys.stderr.write(f'{l.decode("utf-8", "backslashreplace")}\n')

# Function to send a command to the subprocess
def sendcommand(cmd):
    p.stdin.write(cmd.encode() + b"\n")
    p.stdin.flush()

# Create a subprocess
p = subprocess.Popen(
    "adb.exe -s 127.0.0.1:5555 shell",
    stdout=subprocess.PIPE,
    stdin=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

# Create two threads to read and print stdout and stderr concurrently
t1 = threading.Thread(target=readstdout)
t2 = threading.Thread(target=readstderr)

# Start the threads to capture and print the subprocess output
t1.start()
t2.start()

# Send a command to the subprocess
sendcommand("ls")
Lid answered 23/10, 2023 at 1:42 Comment(0)
M
-2

Here is a module that supports non-blocking reads and background writes in python:

https://pypi.python.org/pypi/python-nonblock

Provides a function,

nonblock_read which will read data from the stream, if available, otherwise return an empty string (or None if the stream is closed on the other side and all possible data has been read)

You may also consider the python-subprocess2 module,

https://pypi.python.org/pypi/python-subprocess2

which adds to the subprocess module. So on the object returned from "subprocess.Popen" is added an additional method, runInBackground. This starts a thread and returns an object which will automatically be populated as stuff is written to stdout/stderr, without blocking your main thread.

Enjoy!

Midwife answered 16/1, 2016 at 21:47 Comment(1)
I'd like to try out this nonblock module, but I relatively new at some of the Linux procedures. Exactly how do I install these routines? I'm running Raspbian Jessie, a flavor of Debian Linux for the Raspberry Pi. I tried 'sudo apt-get install nonblock' and python-nonblock and both threw an error - not found. I have downloaded the zip file from this site pypi.python.org/pypi/python-nonblock, but don't know what to do with it. Thanks....RDKPatrolman

© 2022 - 2024 — McMap. All rights reserved.