Timeout on subprocess readline in Python
Asked Answered
T

7

62

I have a small issue that I'm not quite sure how to solve. Here is a minimal example:

What I have

scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
    line = scan_process.stdout.readline()
    some_criterium = do_something(line)

What I would like

scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
    line = scan_process.stdout.readline()
    if nothing_happens_after_10s:
        break
    else:
        some_criterium = do_something(line)

I read a line from a subprocess and do something with it. How can I exit if no line arrived after a fixed time interval?

Toulon answered 25/5, 2012 at 14:38 Comment(4)
related: Non-blocking read on a subprocess.PIPE in pythonTigon
related: Stop reading process output in Python without hang?Tigon
whispers criterionTonsillitis
@SteveCarter yes, the wording could be improved. I would gladly accept a corresponding edit.Tigon
T
34

Thanks for all the answers!

I found a way to solve my problem by simply using select.poll to peek into standard output.

import select
...
scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
poll_obj = select.poll()
poll_obj.register(scan_process.stdout, select.POLLIN)
while(some_criterium and not time_limit):
    poll_result = poll_obj.poll(0)
    if poll_result:
        line = scan_process.stdout.readline()
        some_criterium = do_something(line)
    update(time_limit)
Toulon answered 25/5, 2012 at 17:42 Comment(7)
while this appears to work, it's not robust -- consider if your child process outputs something without a new line. select/poll will trigger, but readline will block indefinitely.Recorder
May not work on Windows, where select.poll() only works for sockets. docs.python.org/2/library/select.htmlPenology
I haven't tested the solution in Windows, so you might be right, I know it's working under OSX and Linux.Toulon
@gentimouton: asyncio can read subprocess' output asynchroniously in a portable mannerTigon
@DimaTisnek, so if there is no line return at all, the program will still be blocked by the readline forever?Clarhe
Tom's solution will block if the subprocess: outputs some text and does not output a new line and does not exit. The OP's intention (I think) was to hit a timeout in such case.Recorder
But sometimes poll() returns an empty list despite additional lines could be read. In my case.Frizzy
T
32

Here's a portable solution that enforces the timeout for reading a single line using asyncio:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

async def run_command(*args, timeout=None):
    # Start child process
    # NOTE: universal_newlines parameter is not supported
    process = await asyncio.create_subprocess_exec(*args,
            stdout=PIPE, stderr=STDOUT)

    # Read line (sequence of bytes ending with b'\n') asynchronously
    while True:
        try:
            line = await asyncio.wait_for(process.stdout.readline(), timeout)
        except asyncio.TimeoutError:
            pass
        else:
            if not line: # EOF
                break
            elif do_something(line):
                continue # While some criterium is satisfied
        process.kill() # Timeout or some criterion is not satisfied
        break
    return await process.wait() # Wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop() # For subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

returncode = loop.run_until_complete(run_command("cmd", "arg 1", "arg 2",
                                                 timeout=10))
loop.close()
Tigon answered 6/12, 2015 at 6:32 Comment(7)
This is awesome, great work! I would suggest process.stdout.read() instead of readline() for someone else who may have more than just one expected line.Irrefutable
@jftuga: .read() would be incorrect here. The question is about .readline(). If you need all the output then it is simpler to use .communicate() with timeout. Read my comment under the answer that uses .communicate().Tigon
@JanKaifer yes. Both the link to Python 3 docs and the explicit shebang #!... python3 point to Python 3. The current Python version is 3.6. The syntax in the answer is Python 3.5 (released in 2015).Tigon
This is great if you can switch everything you do to asyncio. Want to interact with anything using queue.Queue? Tough, that breaks asyncio. Got a non-asyncio library that you want to register a callback with? Tough. asyncio doesn't interact well with anything else and seems to almost always be more trouble than its worth.Toulon
@Tom: Unless it is not obvious, you can interact with the code that doesn't use asyncio from within your asyncio code e.g., asyncio.to_thread and yes, dealing with the async vs. blocking divide (having colored functions) is a general problem journal.stuffwithstuff.com/2015/02/01/…Tigon
Calling loop.run_until_complete() inside a function makes the program return RuntimeWarning: coroutine 'run_command' was never awaited. Anyone has an idea ?Mediatorial
@secavfr: the code worked as is (last time I've tried). In 2022, I would replace everything starting with if sys.platform with just asyncio.run(main()) where inside async def main() you just await run_command(..).Tigon
A
14

I used something a bit more general in Python (if I remember correctly, also pieced together from Stack Overflow questions, but I cannot recall which ones).

import thread
from threading import Timer

def run_with_timeout(timeout, default, f, *args, **kwargs):
    if not timeout:
        return f(*args, **kwargs)
    try:
        timeout_timer = Timer(timeout, thread.interrupt_main)
        timeout_timer.start()
        result = f(*args, **kwargs)
        return result
    except KeyboardInterrupt:
        return default
    finally:
        timeout_timer.cancel()

Be warned, though. This uses an interrupt to stop whatever function you give it. This might not be a good idea for all functions and it also prevents you from closing the program with Ctrl + C during the timeout (i.e. Ctrl + C will be handled as a timeout).

You could use this and call it like:

scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
    line = run_with_timeout(timeout, None, scan_process.stdout.readline)
    if line is None:
        break
    else:
        some_criterium = do_something(line)

It might be a bit overkill, though. I suspect there is a simpler option for your case that I don't know.

Apeak answered 25/5, 2012 at 15:4 Comment(6)
it is not necessary to create a new thread for each line: a single watchdog thread is enoughTigon
Works like a charm and should be picked as best :-) Thanks, @Flogo!Barathea
Isn't it better to put the first two lines inside try-block i.e. "timeout_timer = Timer( ....upto.... timer.start()" outside try-except?Yeh
@AshKetchum: the line timeout_timer.start() should be in the try-block. Imagine you have a very short time limit and there is a context switch after starting the thread and before entering the try-block. That could theoretically lead to a KeyboardInterrupt sent to the main thread. The line initializing the Timer could be outside, I guess.Apeak
Does not seem to work on Ubuntu 18.04, python 3.6.9. Altough _thread.interrupt_main() gets executed, scan_process.stdout.readline() cannot be interrupted.Irresolute
interrupt_main lives in the _thread module. Note the underscore,Yount
D
8

While Tom's solution works, using select() in the C idiom is more compact, this is the equivalent of your answer:

from select import select
scan_process = subprocess.Popen(command,
                                stdout=subprocess.PIPE,
                                stderr=subprocess.STDOUT,
                                bufsize=1)  # Line buffered
while some_criterium and not time_limit:
    poll_result = select([scan_process.stdout], [], [], time_limit)[0]

The rest is the same.

See pydoc select.select.

[Note: this is Unix-specific, as are some of the other answers.]

[Note 2: edited to add line buffering as per OP request]

[Note 3: the line buffering may not be reliable in all circumstances, leading to readline() blocking]

Darg answered 6/12, 2015 at 2:10 Comment(6)
note: this as as well as @Tom's answer doesn't work on Windows and it resets the timeout if any input is received. OP wants to reset the timeout only if a newline is received (though it is straightforward to accommodate this requirement).Tigon
also, to avoid blocking on .readline() like in @Tom's answer, use os.read(scan_process.stdout.fileno(), 512) after the select (it is also not 100% percent safe if something else has access to the pipe) but it is less likely to block after the select than .readline().Tigon
I thought the whole idea was to block until either a line is read or timeout has been reached?... sorry if I'm misunderstanding.Darg
think: if your code is blocked on readline() then how do you expect to respect the timeoutTigon
it won't block because it's line buffered. select() won't return poll_result as True if there isn't a whole line ready to readline()Darg
You don't know whether child's stdout is line-buffered (bufsize=1 has no effect on the child process; it only regulates the buffer used in the parent to read the output) and typically the stdout is block-buffered if it is redirected to a pipe i.e., select() may return without the full line being available.Tigon
G
5

Try using signal.alarm:

#timeout.py
import signal, sys

def timeout(sig, frm):
  print "This is taking too long..."
  sys.exit(1)

signal.signal(signal.SIGALRM, timeout)
signal.alarm(10)
byte = 0

while 'IT' not in open('/dev/urandom').read(2):
  byte += 2
print "I got IT in %s byte(s)!" % byte

A couple of runs to show it works:

$ python timeout.py 
This is taking too long...
$ python timeout.py 
I got IT in 4672 byte(s)!

For a more detailed example, see pGuides.

Grand answered 17/10, 2012 at 20:32 Comment(2)
This is Unix-only, won't work on Windows as SIGALRM and signal.alarm are unavailable.Signorina
This is the simplest if you are unix-only and just need a way to bail out when something isn't happening as quickly as it should because somethings wrong (and works for any situation, not just reads).Herron
T
5

A portable solution is to use a thread to kill the child process if reading a line takes too long:

#!/usr/bin/env python3
from subprocess import Popen, PIPE, STDOUT

timeout = 10
with Popen(command, stdout=PIPE, stderr=STDOUT,
           universal_newlines=True) as process:  # text mode
    # kill process in timeout seconds unless the timer is restarted
    watchdog = WatchdogTimer(timeout, callback=process.kill, daemon=True)
    watchdog.start()
    for line in process.stdout:
        # don't invoke the watcthdog callback if do_something() takes too long
        with watchdog.blocked:
            if not do_something(line):  # some criterium is not satisfied
                process.kill()
                break
            watchdog.restart()  # restart timer just before reading the next line
    watchdog.cancel()

where WatchdogTimer class is like threading.Timer that can be restarted and/or blocked:

from threading import Event, Lock, Thread
from subprocess import Popen, PIPE, STDOUT
from time import monotonic  # use time.time or monotonic.monotonic on Python 2

class WatchdogTimer(Thread):
    """Run *callback* in *timeout* seconds unless the timer is restarted."""

    def __init__(self, timeout, callback, *args, timer=monotonic, **kwargs):
        super().__init__(**kwargs)
        self.timeout = timeout
        self.callback = callback
        self.args = args
        self.timer = timer
        self.cancelled = Event()
        self.blocked = Lock()

    def run(self):
        self.restart() # don't start timer until `.start()` is called
        # wait until timeout happens or the timer is canceled
        while not self.cancelled.wait(self.deadline - self.timer()):
            # don't test the timeout while something else holds the lock
            # allow the timer to be restarted while blocked
            with self.blocked:
                if self.deadline <= self.timer() and not self.cancelled.is_set():
                    return self.callback(*self.args)  # on timeout

    def restart(self):
        """Restart the watchdog timer."""
        self.deadline = self.timer() + self.timeout

    def cancel(self):
        self.cancelled.set()
Tigon answered 6/12, 2015 at 8:42 Comment(0)
N
1

Using threading

import subprocess, threading, time

def _watcher(proc, delay):
    time.sleep(delay)
    proc.kill()

try:

    scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    threading.Thread(target = _watcher, args = (scan_process, 10)).start()
    
    while(some_criterium):
        line = scan_process.stdout.readline()
        if nothing_happens_after_10s:
            break
        else:
            some_criterium = do_something(line)

except Exception as e:
    print(e)

Please also refer How to run a process with timeout and still get stdout at runtime

Niki answered 6/7, 2023 at 1:26 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.