How to limit execution time of a function call? [duplicate]
Asked Answered
T

13

101

There is a socket related function call in my code, that function is from another module thus out of my control, the problem is that it blocks for hours occasionally, which is totally unacceptable, How can I limit the function execution time from my code? I guess the solution must utilize another thread.

Tin answered 14/12, 2008 at 16:20 Comment(0)
R
53

I'm not sure how cross-platform this might be, but using signals and alarm might be a good way of looking at this. With a little work you could make this completely generic as well and usable in any situation.

http://docs.python.org/library/signal.html

So your code is going to look something like this.

import signal

def signal_handler(signum, frame):
    raise Exception("Timed out!")

signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(10)   # Ten seconds
try:
    long_function_call()
except Exception, msg:
    print "Timed out!"
Riana answered 14/12, 2008 at 17:27 Comment(7)
What if I'm using alarm for something else somewhere else ? :-)Dreyfus
Yeah, it's probably worth picking one or the other. I'm a process/signal guy myself. After watching blip.tv/file/2232410 I find myself trusting Python's thread model less and less.Riana
Also, this doesn't disable the alarm afterwardsOverheat
not all functions can be interrupted by a signalPulitzer
@Riana Hi, what's the name of the video? The link blip.tv/file/2232410 is dead.Lockjaw
I have a function that just have sleep(10), and I set timeout to 5 sec. This does not interrupt my sleep function.Discard
signal.alarm and signal.SIGALRM are available only on Unix.Outgeneral
C
145

An improvement on @rik.the.vik's answer would be to use the with statement to give the timeout function some syntactic sugar:

import signal
from contextlib import contextmanager

class TimeoutException(Exception): pass

@contextmanager
def time_limit(seconds):
    def signal_handler(signum, frame):
        raise TimeoutException("Timed out!")
    signal.signal(signal.SIGALRM, signal_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)


try:
    with time_limit(10):
        long_function_call()
except TimeoutException as e:
    print("Timed out!")
Cheapen answered 2/3, 2009 at 3:14 Comment(7)
try: yield \n finally: signal.alarm(0)Pulitzer
To be ultra fussy, does the signal.alarm(seconds) belong in the try?Overheat
Well, it isn't documented as raising any exceptions, so probably not.Cheapen
instead of the last 'Timed out!' you probably intended to write msgThimbleweed
Hello! I've attempted to implement this method in Python 3.4, and have run into an error I am having difficulty addressing: AttributeError: exit The traceback only provides this: File "<stdin>", line 1, in <module> Any idea how to fix this issue?Obliquely
this is a great answer. Wanted to point out that the IDE autofilled the error class with a python3 build-in, which I ended up using instead: TimeoutErrorMaurreen
A note, if you prefer the context manager class structure with __init__(), __enter__(), and __exit__() instead of @contextlib.contextmanager, be sure that __exit__() returns True to suppress the error raised by the alarm.Abbevillian
R
53

I'm not sure how cross-platform this might be, but using signals and alarm might be a good way of looking at this. With a little work you could make this completely generic as well and usable in any situation.

http://docs.python.org/library/signal.html

So your code is going to look something like this.

import signal

def signal_handler(signum, frame):
    raise Exception("Timed out!")

signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(10)   # Ten seconds
try:
    long_function_call()
except Exception, msg:
    print "Timed out!"
Riana answered 14/12, 2008 at 17:27 Comment(7)
What if I'm using alarm for something else somewhere else ? :-)Dreyfus
Yeah, it's probably worth picking one or the other. I'm a process/signal guy myself. After watching blip.tv/file/2232410 I find myself trusting Python's thread model less and less.Riana
Also, this doesn't disable the alarm afterwardsOverheat
not all functions can be interrupted by a signalPulitzer
@Riana Hi, what's the name of the video? The link blip.tv/file/2232410 is dead.Lockjaw
I have a function that just have sleep(10), and I set timeout to 5 sec. This does not interrupt my sleep function.Discard
signal.alarm and signal.SIGALRM are available only on Unix.Outgeneral
D
26

Here's a Linux/OSX way to limit a function's running time. This is in case you don't want to use threads, and want your program to wait until the function ends, or the time limit expires.

from multiprocessing import Process
from time import sleep

def f(time):
    sleep(time)


def run_with_limited_time(func, args, kwargs, time):
    """Runs a function with time limit

    :param func: The function to run
    :param args: The functions args, given as tuple
    :param kwargs: The functions keywords, given as dict
    :param time: The time limit in seconds
    :return: True if the function ended successfully. False if it was terminated.
    """
    p = Process(target=func, args=args, kwargs=kwargs)
    p.start()
    p.join(time)
    if p.is_alive():
        p.terminate()
        return False

    return True


if __name__ == '__main__':
    print run_with_limited_time(f, (1.5, ), {}, 2.5) # True
    print run_with_limited_time(f, (3.5, ), {}, 2.5) # False
Dactylography answered 30/10, 2014 at 22:5 Comment(3)
Works perfectly on my Linux laptop, doesn't work on Windows, and hardly works on OSX. Not your fault, just the wonderful world of programming.Samellasameness
@CaseyPrimozic You are spawning new processes, that might be the reason. I'll correct the answer to Unix, not cross-platformDactylography
Hello! I'm not familiar with the multiprocessing module. How can it be possible to modify your code so that, if the functions ends normally, it returns the time spent ? Is it possible ?Palingenesis
F
23

I prefer a context manager approach because it allows the execution of multiple python statements within a with time_limit statement. Because windows system does not have SIGALARM, a more portable and perhaps more straightforward method could be using a Timer

from contextlib import contextmanager
import threading
import _thread

class TimeoutException(Exception):
    def __init__(self, msg=''):
        self.msg = msg

@contextmanager
def time_limit(seconds, msg=''):
    timer = threading.Timer(seconds, lambda: _thread.interrupt_main())
    timer.start()
    try:
        yield
    except KeyboardInterrupt:
        raise TimeoutException("Timed out for operation {}".format(msg))
    finally:
        # if the action ends in specified time, timer is canceled
        timer.cancel()

import time
# ends after 5 seconds
with time_limit(5, 'sleep'):
    for i in range(10):
        time.sleep(1)

# this will actually end after 10 seconds
with time_limit(5, 'sleep'):
    time.sleep(10)

The key technique here is the use of _thread.interrupt_main to interrupt the main thread from the timer thread. One caveat is that the main thread does not always respond to the KeyboardInterrupt raised by the Timer quickly. For example, time.sleep() calls a system function so a KeyboardInterrupt will be handled after the sleep call.

Franks answered 6/6, 2016 at 1:41 Comment(3)
This seems cleanest. Is there any way to distinguish the interrupt from a keyboard interrupt sent by the user?Watercourse
As an aside, this will kill all threads and is not appropriate as a timer to kill idle threads in a multi-threaded operationSleeper
Because the thread module has been renamed to _thread in Python 3, for compatibility with Python 2 you would need to do: import sys if sys.version_info[0] < 3: from thread import interrupt_main else: from _thread import interrupt_main and then later: timer = threading.Timer(seconds, lambda: interrupt_main())Outskirts
C
14

Here: a simple way of getting the desired effect:

https://pypi.org/project/func-timeout

This saved my life.

And now an example on how it works: lets say you have a huge list of items to be processed and you are iterating your function over those items. However, for some strange reason, your function get stuck on item n, without raising an exception. You need to other items to be processed, the more the better. In this case, you can set a timeout for processing each item:

import time
import func_timeout


def my_function(n):
    """Sleep for n seconds and return n squared."""
    print(f'Processing {n}')
    time.sleep(n)
    return n**2


def main_controller(max_wait_time, all_data):
    """
    Feed my_function with a list of items to process (all_data).

    However, if max_wait_time is exceeded, return the item and a fail info.
    """
    res = []
    for data in all_data:
        try:
            my_square = func_timeout.func_timeout(
                max_wait_time, my_function, args=[data]
                )
            res.append((my_square, 'processed'))
        except func_timeout.FunctionTimedOut:
            print('error')
            res.append((data, 'fail'))
            continue

    return res


timeout_time = 2.1  # my time limit
all_data = range(1, 10)  # the data to be processed

res = main_controller(timeout_time, all_data)
print(res)
Confiscable answered 10/2, 2021 at 19:40 Comment(4)
I used func_timeout on Windows as an alternative to solutions that use SIGALRM, which is only available on Linux.Adder
I think it's the best solution for timeouting a function if it's running on Windows and you are not intended to do multi-processing.Vulcan
@erickfis, thanks for sharing this solution. would this solution kill background thread/abort after timeout limit or it would still run in the background (trying to avoid deadlock scenario where process hangs for long time)? thanksMarkman
Gave me "Aborted (core dumped)" unfortunatelyTungstate
D
7

Doing this from within a signal handler is dangerous: you might be inside an exception handler at the time the exception is raised, and leave things in a broken state. For example,

def function_with_enforced_timeout():
  f = open_temporary_file()
  try:
   ...
  finally:
   here()
   unlink(f.filename)

If your exception is raised here(), the temporary file will never be deleted.

The solution here is for asynchronous exceptions to be postponed until the code is not inside exception-handling code (an except or finally block), but Python doesn't do that.

Note that this won't interrupt anything while executing native code; it'll only interrupt it when the function returns, so this may not help this particular case. (SIGALRM itself might interrupt the call that's blocking--but socket code typically simply retries after an EINTR.)

Doing this with threads is a better idea, since it's more portable than signals. Since you're starting a worker thread and blocking until it finishes, there are none of the usual concurrency worries. Unfortunately, there's no way to deliver an exception asynchronously to another thread in Python (other thread APIs can do this). It'll also have the same issue with sending an exception during an exception handler, and require the same fix.

Dymoke answered 11/7, 2009 at 20:30 Comment(1)
native code may call PyErr_CheckSignals() before restarting on EINTR, to allow a Python signal handler to run.Pulitzer
R
6

The only "safe" way to do this, in any language, is to use a secondary process to do that timeout-thing, otherwise you need to build your code in such a way that it will time out safely by itself, for instance by checking the time elapsed in a loop or similar. If changing the method isn't an option, a thread will not suffice.

Why? Because you're risking leaving things in a bad state when you do. If the thread is simply killed mid-method, locks being held, etc. will just be held, and cannot be released.

So look at the process way, do not look at the thread way.

Roebuck answered 14/12, 2008 at 17:20 Comment(0)
H
5

You don't have to use threads. You can use another process to do the blocking work, for instance, maybe using the subprocess module. If you want to share data structures between different parts of your program then Twisted is a great library for giving yourself control of this, and I'd recommend it if you care about blocking and expect to have this trouble a lot. The bad news with Twisted is you have to rewrite your code to avoid any blocking, and there is a fair learning curve.

You can use threads to avoid blocking, but I'd regard this as a last resort, since it exposes you to a whole world of pain. Read a good book on concurrency before even thinking about using threads in production, e.g. Jean Bacon's "Concurrent Systems". I work with a bunch of people who do really cool high performance stuff with threads, and we don't introduce threads into projects unless we really need them.

Henrieta answered 14/12, 2008 at 17:13 Comment(0)
P
4

I would usually prefer using a contextmanager as suggested by @josh-lee

But in case someone is interested in having this implemented as a decorator, here's an alternative.

Here's how it would look like:

import time
from timeout import timeout

class Test(object):
    @timeout(2)
    def test_a(self, foo, bar):
        print foo
        time.sleep(1)
        print bar
        return 'A Done'

    @timeout(2)
    def test_b(self, foo, bar):
        print foo
        time.sleep(3)
        print bar
        return 'B Done'

t = Test()
print t.test_a('python', 'rocks')
print t.test_b('timing', 'out')

And this is the timeout.py module:

import threading

class TimeoutError(Exception):
    pass

class InterruptableThread(threading.Thread):
    def __init__(self, func, *args, **kwargs):
        threading.Thread.__init__(self)
        self._func = func
        self._args = args
        self._kwargs = kwargs
        self._result = None

    def run(self):
        self._result = self._func(*self._args, **self._kwargs)

    @property
    def result(self):
        return self._result


class timeout(object):
    def __init__(self, sec):
        self._sec = sec

    def __call__(self, f):
        def wrapped_f(*args, **kwargs):
            it = InterruptableThread(f, *args, **kwargs)
            it.start()
            it.join(self._sec)
            if not it.is_alive():
                return it.result
            raise TimeoutError('execution expired')
        return wrapped_f

The output:

python
rocks
A Done
timing
Traceback (most recent call last):
  ...
timeout.TimeoutError: execution expired
out

Notice that even if the TimeoutError is thrown, the decorated method will continue to run in a different thread. If you would also want this thread to be "stopped" see: Is there any way to kill a Thread in Python?

Proofread answered 27/1, 2016 at 13:38 Comment(3)
1 - InterruptableThread class without interrupt method. 2 - even after the execution is expired the thread is still executing the functionGroin
The TimeoutError is a build in exception today: docs.python.org/3/library/exceptions.html.Hefty
That works perfectly for me. Thanks a lotHereditament
A
0

Here's a timeout function I think I found via google and it works for me.

From: http://code.activestate.com/recipes/473878/

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    '''This function will spwan a thread and run the given function using the args, kwargs and 
    return the given default value if the timeout_duration is exceeded 
    ''' 
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            try:
                self.result = func(*args, **kwargs)
            except:
                self.result = default
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return it.result
    else:
        return it.result   
Aveyron answered 15/12, 2008 at 4:41 Comment(3)
Why do both parts of the if/else return block return it.result?Ortensia
That was my edit for the specific problem I was dealing with. I don't remember why. Anyway the original returned "default" if it.isAlive().Aveyron
Please note this function will not stop the thread after the timeout, leaving unfinished zombie threads using resources.Viscosity
J
0

The method from @user2283347 is tested working, but we want to get rid of the traceback messages. Use pass trick from Remove traceback in Python on Ctrl-C, the modified code is:

from contextlib import contextmanager
import threading
import _thread

class TimeoutException(Exception): pass

@contextmanager
def time_limit(seconds):
    timer = threading.Timer(seconds, lambda: _thread.interrupt_main())
    timer.start()
    try:
        yield
    except KeyboardInterrupt:
        pass     
    finally:
        # if the action ends in specified time, timer is canceled
        timer.cancel()

def timeout_svm_score(i):
     #from sklearn import svm
     #import numpy as np
     #from IPython.core.display import display
     #%store -r names X Y
     clf = svm.SVC(kernel='linear', C=1).fit(np.nan_to_num(X[[names[i]]]), Y)
     score = clf.score(np.nan_to_num(X[[names[i]]]),Y)
     #scoressvm.append((score, names[i]))
     display((score, names[i])) 
     
%%time
with time_limit(5):
    i=0
    timeout_svm_score(i)
#Wall time: 14.2 s

%%time
with time_limit(20):
    i=0
    timeout_svm_score(i)
#(0.04541284403669725, '计划飞行时间')
#Wall time: 16.1 s

%%time
with time_limit(5):
    i=14
    timeout_svm_score(i)
#Wall time: 5h 43min 41s

We can see that this method may need far long time to interrupt the calculation, we asked for 5 seconds, but it work out in 5 hours.

Jerrine answered 11/9, 2020 at 22:37 Comment(0)
C
0

Using simple decorator

Here's the version I made after studying above answers. Pretty straight forward.

def function_timeout(seconds: int):
    """Wrapper of Decorator to pass arguments"""

    def decorator(func):
        @contextmanager
        def time_limit(seconds_):
            def signal_handler(signum, frame):  # noqa
                raise TimeoutException(f"Timed out in {seconds_} seconds!")

            signal.signal(signal.SIGALRM, signal_handler)
            signal.alarm(seconds_)
            try:
                yield
            finally:
                signal.alarm(0)

        @wraps(func)
        def wrapper(*args, **kwargs):
            with time_limit(seconds):
                return func(*args, **kwargs)

        return wrapper

    return decorator

How to use?

@function_timeout(seconds=5)
def my_naughty_function():
    while True:
        print("Try to stop me ;-p")

Well of course, don't forget to import the function if it is in a separate file.

Chromogenic answered 21/4, 2022 at 18:47 Comment(0)
C
0

This code works for Windows Server Datacenter 2016 with python 3.7.3 and I didn't tested on Unix, after mixing some answers from Google and StackOverflow, it finally worked for me like this:

from multiprocessing import Process, Lock
import time
import os

def f(lock,id,sleepTime):
    lock.acquire()
    print("I'm P"+str(id)+" Process ID: "+str(os.getpid()))
    lock.release()
    time.sleep(sleepTime)   #sleeps for some time
    print("Process: "+str(id)+" took this much time:"+str(sleepTime))
    time.sleep(sleepTime)
    print("Process: "+str(id)+" took this much time:"+str(sleepTime*2))

if __name__ == '__main__':
    timeout_function=float(9) # 9 seconds for max function time
    print("Main Process ID: "+str(os.getpid()))
    lock=Lock()
    p1=Process(target=f, args=(lock,1,6,))   #Here you can change from 6 to 3 for instance, so you can watch the behavior
    start=time.time()
    print(type(start))
    p1.start()
    if p1.is_alive():
        print("process running a")
    else:
        print("process not running a")
    while p1.is_alive():
        timeout=time.time()
        if timeout-start > timeout_function:
            p1.terminate()
            print("process terminated")
        print("watching, time passed: "+str(timeout-start) )
        time.sleep(1)
    if p1.is_alive():
        print("process running b")
    else:
        print("process not running b")
    p1.join()
    if p1.is_alive():
        print("process running c")
    else:
        print("process not running c")
    end=time.time()
    print("I am the main process, the two processes are done")
    print("Time taken:- "+str(end-start)+" secs")   #MainProcess terminates at approx ~ 5 secs.
    time.sleep(5) # To see if on Task Manager the child process is really being terminated, and it is
    print("finishing")

The main code is from this link: Create two child process using python(windows)

Then I used .terminate() to kill the child process. You can see that the function f calls 2 prints, one after 5 seconds and another after 10 seconds. However, with a 7 seconds sleep and the terminate(), it does not show the last print.

It worked for me, hope it helps!

Cence answered 4/5, 2022 at 12:21 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.