Python: Wait on all of `concurrent.futures.ThreadPoolExecutor`'s futures
Asked Answered
P

3

92

I've given concurrent.futures.ThreadPoolExecutor a bunch of tasks, and I want to wait until they're all completed before proceeding with the flow. How can I do that, without having to save all the futures and call wait on them? (I want an action on the executor.)

Peyton answered 15/1, 2014 at 16:40 Comment(2)
the executor has no concept of when it is done executing, that is the domain of the Future object. Can you explain further why you don't want to use Future methods? There's a number of different ways to do it (one of them being wait as you pointed out).Vienne
@Vienne Actually it does. See my answer. However it's not probably what the OP expects.Guide
G
99

Just call Executor.shutdown:

shutdown(wait=True)

Signal the executor that it should free any resources that it is using when the currently pending futures are done executing. Calls to Executor.submit() and Executor.map() made after shutdown will raise RuntimeError.

If wait is True then this method will not return until all the pending futures are done executing and the resources associated with the executor have been freed.

However if you keep track of your futures in a list then you can avoid shutting the executor down for future use using the futures.wait() function:

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

Wait for the Future instances (possibly created by different Executor instances) given by fs to complete. Returns a named 2-tuple of sets. The first set, named done, contains the futures that completed (finished or were cancelled) before the wait completed. The second set, named not_done, contains uncompleted futures.

note that if you don't provide a timeout it waits until all futures have completed.

You can also use futures.as_completed() instead, however you'd have to iterate over it.

Guide answered 15/1, 2014 at 17:8 Comment(7)
So you're saying that if I use the executor as a context manager, then after the suite finishes, it'll wait until all futures are done, even if there are 10,000 of them and only 10 workers?Peyton
@RamRachum Yes. From the documentation: "You can avoid having to call this method explicitly if you use the with statement, which will shutdown the Executor (waiting as if Executor.shutdown() were called with wait set to True):".Guide
Does shutdown(wait=True) wait for callbacks of the future too?Frear
if you are using the with statement, then shutdown is called automatically.Ionogen
@Ionogen in my experience the executor isn't usually used as context manager because it is used from different parts of the code.Guide
Note this can't be used when the "top-level" futures might add "children" futures to the same pool as they run. RuntimeError('cannot schedule new futures after shutdown',) Sigh.Novelist
If the futures may create new futures to the executor, shutdown(wait=True) seems not to wait them, but concurrent.futures.wait does.Thickening
P
30

As stated before, one can use Executor.shutdown(wait=True), but also pay attention to the following note in the documentation:

You can avoid having to call this method explicitly if you use the with statement, which will shutdown the Executor (waiting as if Executor.shutdown() were called with wait set to True):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
Phospholipide answered 1/9, 2020 at 7:50 Comment(0)
S
19

Bakuriu's answer is correct. Just to extend a little bit. As we all know a context manager has __enter__ and __exit__ method. Here is how class Executor(ThreadPoolExecutor's base class) is defined

class Executor(object):

    # other methods

    def shutdown(self, wait=True):
        """Clean-up the resources associated with the Executor.

        It is safe to call this method several times. Otherwise, no other
        methods can be called after this one.

        Args:
            wait: If True then shutdown will not return until all running
                futures have finished executing and the resources used by the
                executor have been reclaimed.
        """
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False

And it is ThreadPoolExecutor that actually defines the shutdown method

class ThreadPoolExecutor(_base.Executor):
    def shutdown(self, wait=True):
        with self._shutdown_lock:
            self._shutdown = True
            self._work_queue.put(None)
        if wait:
            for t in self._threads:
                t.join()
Selfregulated answered 15/1, 2014 at 16:59 Comment(4)
as the name implies, this yields results as they are completed, i.e. it does not wait for the whole pool of tasks to finish, as the OP requested.Vienne
@Vienne If you know how many tasks there are, why not count the number of finished tasks? But that being said, there is no guarantee the program knows the number of concurrent tasks.Selfregulated
@Vienne what is the different between completed and finished?Behre
What would happen if we just wrote for t in executor._threads: t.join() without any other code (i.e., without shutting down the Executor)? I know it wouldn't be legal anyway since _threads isn't public API, but I'm trying to understand the logic of thread creation/destruction in the Executor. Could this potentially hang forever since new tasks are added to the executor, preventing the threads from ever finishing?Gubernatorial

© 2022 - 2024 — McMap. All rights reserved.