How to break time.sleep() in a python concurrent.futures
Asked Answered
D

5

9

I am playing around with concurrent.futures.

Currently my future calls time.sleep(secs).

It seems that Future.cancel() does less than I thought.

If the future is already executing, then time.sleep() does not get cancel by it.

The same for the timeout parameter for wait(). It does not cancel my time.sleep().

How to cancel time.sleep() which gets executed in a concurrent.futures?

For testing I use the ThreadPoolExecutor.

Decorative answered 19/7, 2016 at 14:38 Comment(1)
short answer - no way, and most probably usage of sleep in workers means problem with design, long-answer - you always able to implement custom sleep with possibility to break them, however it is not neither pythonic or correct. as alternative you can check for lock usage. – Wesle
S
7

If you submit a function to a ThreadPoolExecutor, the executor will run the function in a thread and store its return value in the Future object. Since the number of concurrent threads is limited, you have the option to cancel the pending execution of a future, but once control in the worker thread has been passed to the callable, there's no way to stop execution.

Consider this code:

import concurrent.futures as f
import time

T = f.ThreadPoolExecutor(1) # Run at most one function concurrently
def block5():
    time.sleep(5)
    return 1
q = T.submit(block5)
m = T.submit(block5)

print q.cancel()  # Will fail, because q is already running
print m.cancel()  # Will work, because q is blocking the only thread, so m is still queued

In general, whenever you want to have something cancellable you yourself are responsible for making sure that it is.

There are some off-the-shelf options available though. E.g., consider using asyncio, they also have an example using sleep. The concept circumvents the issue by, whenever any potentially blocking operation is to be called, instead returning control to a control loop running in the outer-most context, together with a note that execution should be continued whenever the result is available - or, in your case, after n seconds have passed.

Strong answered 25/7, 2016 at 7:59 Comment(14)
Oh what fun :-) I switched from multiprocessing to concurrent.futures (for other reasons). Now I am thinking about switching from concurrent.futures to asyncio ... :-). Nevertheless, Phillip, thank you for your answer! – Decorative
You're welcome πŸ˜‰ Btw, with multiprocessing, interrupting sleep was possible, because you can of course kill the other processes. – Strong
@ I thought I can use kill in concurrent.futures, too. I just need to switch from ThreadPoolExecutor to ProcessPoolExecutor. Or is this wrong? – Decorative
In theory, yes, but (a), the _processes attribute isn't documented and thus subject to change, and (b), after you've detected that your future is currently running and not finished, you'd have a race between the future finishing and you killing it -- if you lose, then you kill another (unrelated) task instead of the one you intended. – Strong
AFAIK linux increases the PID for each new process, cycling if it reaches the upper limit. It is very unlikely that this will happen. But you are right: It is a race condition. – Decorative
It's a Process pool, python does not spawn a new process for every task! – Strong
Are multiprocessing and concurrent.futures different in this area? AFAIK both use a process pool. – Decorative
With multiprocessing, you can use the Process class directly to insert your own safeguards. Then again, you can of course also subclass ProcessPoolExecutor and add the safe-termination capability. – Strong
I could not find docs about inserting safeguards in the official docs. Case1: I am blind, Case2: the official docs don't cover this. If it is case2: Do you think this should be documented? I don't trust blog-posts or other 3party resources. In the long run they are outdated not not well maintained. – Decorative
Case 2, and I don't think that this should be documented because this is never best practise and only useful in a corner case, namely if you have a bunch of long-running tasks (otherwise there's no need to cancel anything, just let it continue and ignore the result) that you very rarely need to cancel (otherwise killing the process each time is overkill and you really should design your task to be cancellable). – Strong
@Strong i understand that asyncio, functional programming come popular but breaking time.sleep have nothing to asyncio. I will even say, if he use sleep in ThreadedPool, looks like he has an problem with application design and even asyncio will not safe him. – Wesle
@Wesle Mind that the asyncio example uses asyncio.sleep, not time.sleep. time.sleep is only interruptible by sending a signal to the sleeping process, irregardless of the Python interface. My point is that if you want to sleep in an interruptible fashion, you shouldn't use nanosleep(2) at all, but instead, e.g., select(2) on a eventfd(2) using a timeout. asyncio is one possible wrapper around this (and similar) APIs. – Strong
@Strong asyncio.sleep used to give a chance to another "streams" to be processed while caller will wait, and there no guarantee that this would be in 1 or 2 secs(due to nature of asyncio). Another implementations like eventfd r platform-depended and here is a simple question - how to break time.sleep, not to how to workaround to make this possibly. – Wesle
@Wesle All asyncio functions are used to give other parts of the program a chance to be processed. That's the whole point and the reason why the module can serve as a means for the author to write interruptible algorithms. If you use asyncio correctly, asyncio.sleep does guarantee that you sleep for the given amount of time (up to the event loop's resolution). I disagree on the nature of the question. time.sleep was used by the author as an example; what he was asking for is how a (running) task whose result is encapsulated in a Future can be cancelled. – Strong
W
2

I do not know much about concurrent.futures, but you can use this logic to break the time. Use a loop instead of sleep.time() or wait()

for i in range(sec):
    sleep(1)

interrupt or break can be used to come out of loop.

Wirehaired answered 28/7, 2016 at 6:34 Comment(1)
Yes, this could work. It feels like a guy from Finland who just want to read hies mails via a dialup connection... hmmm I need an event-loop .... I need a scheduler ....and finally its an OS. – Decorative
R
1

I figured it out.

Here is a example:

from concurrent.futures import ThreadPoolExecutor
import queue
import time

class Runner:
    def __init__(self):
        self.q = queue.Queue()
        self.exec = ThreadPoolExecutor(max_workers=2)

    def task(self):
        while True:
            try:
                self.q.get(block=True, timeout=1)
                break
            except queue.Empty:
                pass
            print('running')

    def run(self):
        self.exec.submit(self.task)

    def stop(self):
        self.q.put(None)
        self.exec.shutdown(wait=False,cancel_futures=True)

r = Runner()
r.run()
time.sleep(5)
r.stop()
Radii answered 21/4, 2022 at 4:3 Comment(0)
C
0

I've faced this same problem recently. I had 2 tasks to run concurrently and one of them had to sleep from time to time. In the code below, suppose task2 is the one that sleeps.

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=2)
executor.submit(task1)
executor.submit(task2)

executor.shutdown(wait=True)

In order to avoid the endless sleep I've extracted task2 to run synchronously. I don't whether it's a good practice, but it's simple and fit perfectly in my scenario.

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(task1)

task2()

executor.shutdown(wait=True)

Maybe it's useful to someone else.

Cerebral answered 25/8, 2021 at 20:30 Comment(0)
M
-1

As it is written in its link, You can use a with statement to ensure threads are cleaned up promptly, like the below example:

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))
Malindamalinde answered 7/1, 2021 at 10:9 Comment(1)
Clean response, could you please explain the part where the time sleep is happening ? – Shadrach

© 2022 - 2024 β€” McMap. All rights reserved.