Python timeout context manager with threads
Asked Answered
W

5

12

I have timeout context manager that works perfectly with signals but it raises error in multithread mode because signals work only in main thread.

def timeout_handler(signum, frame):
    raise TimeoutException()

@contextmanager
def timeout(seconds):
    old_handler = signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, old_handler)

I've seen decorator implementation of timeout but I don't know how to pass yield inside class derived from threading.Thread. My variant won't work.

@contextmanager
def timelimit(seconds):
    class FuncThread(threading.Thread):
        def run(self):
            yield

    it = FuncThread()        
    it.start()
    it.join(seconds)

    if it.isAlive():
        raise TimeoutException()
Warnerwarning answered 22/2, 2013 at 6:54 Comment(0)
C
16

If the code guarded by the context manager is loop-based, consider handling this the way people handle thread killing. Killing another thread is generally unsafe, so the standard approach is to have the controlling thread set a flag that's visible to the worker thread. The worker thread periodically checks that flag and cleanly shuts itself down. Here's how you can do something analogous with timeouts:

class timeout(object):
    def __init__(self, seconds):
        self.seconds = seconds
    def __enter__(self):
        self.die_after = time.time() + self.seconds
        return self
    def __exit__(self, type, value, traceback):
        pass
    @property
    def timed_out(self):
        return time.time() > self.die_after

Here's a single-threaded usage example:

with timeout(1) as t:
    while True: # this will take a long time without a timeout
        # periodically check for timeouts
        if t.timed_out:
            break # or raise an exception
        # do some "useful" work
        print "."
        time.sleep(0.2)

and a multithreaded one:

import thread
def print_for_n_secs(string, seconds):
    with timeout(seconds) as t:
        while True:
            if t.timed_out:
                break # or raise an exception
            print string,
            time.sleep(0.5)

for i in xrange(5):
    thread.start_new_thread(print_for_n_secs,
                            ('thread%d' % (i,), 2))
    time.sleep(0.25)

This approach is more intrusive than using signals but it works for arbitrary threads.

Clemens answered 3/3, 2013 at 20:12 Comment(1)
It's a possible approach but not so short and clear as I'd like to get. Your variant requires to wrap code with function like decorator, but it's the new approach for me, so I gave you bounty. Thanks.Warnerwarning
P
3

I cannot see a way of doing what you are proposing with a context manager, you cannot yield the flow from one thread to another. What I would do is wrap your function with an interrutable thread with the timeout. Here is a recipe for that.

You will have an extra thread and the syntax won't be as nice but it would work.

Pompeii answered 26/2, 2013 at 2:5 Comment(1)
Note that the interruptable thread described by the recipe isn't actually interrupted, and actually keeps running. AFAIK there's not reliable way to interrupt a non-main python thread.Nisse
J
1

I know it's late but I'm only just reading this, but what about creating your own signaller/context manager? I'm new to python would love feedback from experienced devs this implementation.

This is based off of the answer from "Mr Fooz"

class TimeoutSignaller(Thread):
    def __init__(self, limit, handler):
        Thread.__init__(self)
        self.limit = limit
        self.running = True
        self.handler = handler
        assert callable(handler), "Timeout Handler needs to be a method"

    def run(self):
        timeout_limit = datetime.datetime.now() + datetime.timedelta(seconds=self.limit)
        while self.running:
            if datetime.datetime.now() >= timeout_limit:
                self.handler()
                self.stop_run()
                break

    def stop_run(self):
        self.running = False

class ProcessContextManager:
    def __init__(self, process, seconds=0, minutes=0, hours=0):
        self.seconds = (hours * 3600) + (minutes * 60) + seconds
        self.process = process
        self.signal = TimeoutSignaller(self.seconds, self.signal_handler)

    def __enter__(self):
        self.signal.start()
        return self.process

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.signal.stop_run()

    def signal_handler(self):
        # Make process terminate however you like
        # using self.process reference
        raise TimeoutError("Process took too long to execute")

Use case:

with ProcessContextManager(my_proc) as p:
    # do stuff e.g.
    p.execute()
Jair answered 6/7, 2017 at 12:5 Comment(0)
C
-1

Similar implementation as Mr Fooz but using the contextlib library:

import time
from contextlib import contextmanager

@contextmanager
def timeout(seconds):
    """
    A simple context manager to enable timeouts.

    Example:

        with timeout(5) as t:
            while True:
                if t():
                    # handle
    """
    stop = time.time() + seconds
    def timed_out():
        return time.time() > stop

    yield timed_out
Cooncan answered 21/7, 2022 at 22:41 Comment(0)
J
-3

Timeouts for system calls are done with signals. Most blocking system calls return with EINTR when a signal happens, so you can use alarm to implement timeouts.

Here's a context manager that works with most system calls, causing IOError to be raised from a blocking system call if it takes too long.

import signal, errno
from contextlib import contextmanager
import fcntl

@contextmanager
def timeout(seconds):
    def timeout_handler(signum, frame):
        pass

    original_handler = signal.signal(signal.SIGALRM, timeout_handler)

    try:
        signal.alarm(seconds)
        yield
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, original_handler)

with timeout(1):
    f = open("test.lck", "w")
    try:
        fcntl.flock(f.fileno(), fcntl.LOCK_EX)
    except IOError, e:
        if e.errno != errno.EINTR:
            raise e
        print "Lock timed out"
Johnnyjumpup answered 22/2, 2013 at 6:57 Comment(2)
As in my 1st variant I got ValueError: signal only works in main thread in line original_handler = signal.signal(signal.SIGALRM, timeout_handler)Warnerwarning
As the OP stated, signals only work in the main thread. The OP needs a different solution instead.Hamner

© 2022 - 2024 — McMap. All rights reserved.