How to timeout a long running program using rxpython?
Asked Answered
J

5

14

Say I have a long running python function that looks something like this?

import random
import time
from rx import Observable
def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    time.sleep(y)
    print('end')
    return x

I want to be able to set a timeout of 1000ms.

So I'm dong something like, creating an observable and mapping it through the above intense calculation.

a = Observable.repeat(1).map(lambda x: intns(x))

Now for each value emitted, if it takes more than 1000ms I want to end the observable, as soon as I reach 1000ms using on_error or on_completed

a.timeout(1000).subscribe(lambda x: print(x), lambda x: print(x))

above statement does get timeout, and calls on_error, but it goes on to finish calculating the intense calculation and only then returns to the next statements. Is there a better way of doing this?

The last statement prints the following

8 # no of seconds to sleep
begin # begins sleeping, trying to emit the first value
Timeout # operation times out, and calls on_error
end # thread waits till the function ends

The idea is that if a particular function timesout, i want to be able to continue with my program, and ignore the result.

I was wondering if the intns function was done on a separate thread, I guess the main thread continues execution after timeout, but I still want to stop computing intns function on a thread, or kill it somehow.

Jokjakarta answered 20/7, 2017 at 22:30 Comment(0)
C
1

The following is a class that can be called using with timeout() :

If the block under the code runs for longer than the specified time, a TimeoutError is raised.

import signal

class timeout:
    # Default value is 1 second (1000ms)
    def __init__(self, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
    def handle_timeout(self, signum, frame):
        raise TimeoutError(self.error_message)
    def __enter__(self):
        signal.signal(signal.SIGALRM, self.handle_timeout)
        signal.alarm(self.seconds)
    def __exit__(self, type, value, traceback):
        signal.alarm(0)

# example usage
with timeout() :
    # infinite while loop so timeout is reached
    while True :
        pass

If I'm understanding your function, here's what your implementation would look like:

def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    with timeout() :
        time.sleep(y)
    print('end')
    return x
Christa answered 4/12, 2017 at 1:26 Comment(0)
A
0

You can do this partially using threading Although there's no specific way to kill a thread in python, you can implement a method to flag the thread to end.

This won't work if the thread is waiting on other resources (in your case, you simulated a "long" running code by a random wait)

See also Is there any way to kill a Thread in Python?

Arrear answered 6/9, 2017 at 20:39 Comment(2)
An example showing this behavior would be pretty niceAndean
Please see new answerArrear
I
0

This way it works:

import random
import time
import threading
import os

def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    time.sleep(y)
    print('end')
    return x


thr = threading.Thread(target=intns, args=([10]), kwargs={})
thr.start()
st = time.clock();
while(thr.is_alive() == True):
    if(time.clock() - st > 9):
        os._exit(0)
Iodometry answered 7/9, 2017 at 10:34 Comment(3)
It works, but it is in no way related to the rx package which is the core of the questionAndean
Not really, he didn't ask how to do it with rx, he just said that he tried doing it with rxIodometry
true, i didn't notice the 'rx' in 'rxpython' anyway i think he wasn't really interested in the rx packageIodometry
A
0

Here's an example for timeout

import random
import time
import threading

_timeout = 0

def intns(loops=1):
    print('begin')
    processing = 0
    for i in range(loops):
        y = random.randint(5,10)
        time.sleep(y)
        if _timeout == 1:
            print('timedout end')
            return
        print('keep processing')
    return

# this will timeout
timeout_seconds = 10
loops = 10

# this will complete
#timeout_seconds = 30.0
#loops = 1

thr = threading.Thread(target=intns, args=([loops]), kwargs={})
thr.start()
st = time.clock();
while(thr.is_alive() == True):
    if(time.clock() - st > timeout_seconds):
        _timeout = 1

thr.join()
if _timeout == 0:
    print ("completed")
else:
    print ("timed-out")
Arrear answered 8/9, 2017 at 15:29 Comment(3)
and how do you suggest to combine it with rx?Hallah
this is generic workaround. The Observable function can set a timer and check in between tasks if it has to timeout and return on_error. This has the limitation stated above on where the wait/time consuming processing isArrear
Directly threading is a bad way, especially for novice and non-system programmers ! More secure is the use of the Timer object type from Python (from threading import Timer), which at the same time meets the requirement to determine the exact milliseconds instead of seconds - docs.python.org/2.4/lib/timer-objects.html The use of time.sleep or time.clock (which someone uses in another post) is according to unnecessary and complicated solution.Frozen
G
0

You can use time.sleep() and make a while loop for time.clock()

Gyn answered 18/1, 2018 at 6:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.