Use tqdm with concurrent.futures?
Asked Answered
A

6

103

I have a multithreaded function that I would like a status bar for using tqdm. Is there an easy way to show a status bar with ThreadPoolExecutor? It is the parallelization part that is confusing me.

import concurrent.futures

def f(x):
    return f**2

my_iter = range(1000000)

def run(f,my_iter):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        function = list(executor.map(f, my_iter))
    return results

run(f, my_iter) # wrap tqdr around this function?
Affective answered 30/7, 2018 at 20:21 Comment(1)
you can use from tqdm.contrib.concurrent import process_map see #41920624Adulation
J
142

You can wrap tqdm around the executor as the following to track the progress:

list(tqdm(executor.map(f, iter), total=len(iter))

Here is your example:

import time  
import concurrent.futures
from tqdm import tqdm

def f(x):
    time.sleep(0.001)  # to visualize the progress
    return x**2

def run(f, my_iter):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = list(tqdm(executor.map(f, my_iter), total=len(my_iter)))
    return results

my_iter = range(100000)
run(f, my_iter)

And the result is like this:

16%|██▏           | 15707/100000 [00:00<00:02, 31312.54it/s]
Jaramillo answered 9/9, 2018 at 9:1 Comment(8)
Thank you! The key seems to be the list() around tqdm, why is that the case?Jinnyjinrikisha
@DreamFlasher: That behavior is because tqdm runs on execution. Executor.map itself is only a generator.Triboluminescent
Like that, you will not get the output instantly ! so you've to wait until the full progress completed till you see the full result !Ammonal
the total argument in tqdm is important. Without it, we can not see the overall progress.Number
This blocks time updates in the progress bar, is there a way to fix it?Fossa
Just call update(0) to update the timeFossa
Doesn't work for me. No progress bar shows at all. I have to put tqdm into the function I'm running which results in one progress bar for each process.Boleyn
To get ordered results as they come in (and update the tqdm accordingly), use multiprocessing.pool.ThreadPool.imap instead of concurrent.futures.ThreadPoolExecutor.map (which has some caveats).Priority
A
85

The problem with the accepted answer is that the ThreadPoolExecutor.map function is obliged to generate results not in the order that they become available. So if the first invocation of myfunc happens to be, for example, the last one to complete, the progress bar will go from 0% to 100% all at once and only when all of the calls have completed. Much better would be to use ThreadPoolExecutor.submit with as_completed:

import time
import concurrent.futures
from tqdm import tqdm

def f(x):
    time.sleep(0.001)  # to visualize the progress
    return x**2

def run(f, my_iter):
    l = len(my_iter)
    with tqdm(total=l) as pbar:
        # let's give it some more threads:
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            futures = {executor.submit(f, arg): arg for arg in my_iter}
            results = {}
            for future in concurrent.futures.as_completed(futures):
                arg = futures[future]
                results[arg] = future.result()
                pbar.update(1)
    print(321, results[321])

my_iter = range(100000)
run(f, my_iter)

Prints:

321 103041

This is just the general idea. Depending upon the type of my_iter, it may not be possible to directly take apply the len function directly to it without first converting it into a list. The main point is to use submit with as_completed.

Abixah answered 10/9, 2020 at 17:28 Comment(8)
Thanks! This really helped but out of some reason the progress bar stopped after a while?Myrilla
Just wanted to mention that with minor modifications (move to def main()) this works just as well with the ProcessPoolExecutor, which can be much faster if f(x) actually does computation since it is not affected by the global interpreter lock.Povertystricken
Since someone just asked me, here is the code of the example adapted for the ProcessPoolExecutor gist.github.com/ltalirz/9220946c5c9fd920a1a2d81ce7375c47Povertystricken
@Povertystricken Of course, if it weren't for the call to sleep that was added solely to "visualize the result", function f is really a poor candidate even for multiprocessing since it is not CPU-intensive enough to justify the added overhead (that is, just calling f in a loop would be faster). The real point of the question as I understood was really about how to update the progress bar. But for what it's worth, with the call to sleep, multithreading does better than multiprocessing with this particular f function due to its reduced overhead.Abixah
This blocks time updates in the progress bar, is there a way to fix it?Fossa
Just call update(0) to update the timeFossa
This is a good solution. However, I have a problem where the progress bar does not update if the above run function is executed more than once in the same jupyter notebook instance. Can anyone else confirm this problem and/or offer a solution?Liberalize
Because it took me way too long to figure this out: make sure your as_completed() call is inside the executor context.. facepalmRoadwork
S
6

Most short way, i think:

with ThreadPoolExecutor(max_workers=20) as executor:
    results = list(tqdm(executor.map(myfunc, range(len(my_array))), total=len(my_array)))
Systematize answered 27/4, 2020 at 17:19 Comment(0)
S
4

tried the example but progress bar fails still, and I find this post, seems useful in short way to use:

def tqdm_parallel_map(fn, *iterables):
    """ use tqdm to show progress"""
    executor = concurrent.futures.ProcessPoolExecutor()
    futures_list = []
    for iterable in iterables:
        futures_list += [executor.submit(fn, i) for i in iterable]
    for f in tqdm(concurrent.futures.as_completed(futures_list), total=len(futures_list)):
        yield f.result()


def multi_cpu_dispatcher_process_tqdm(data_list, single_job_fn):
    """ multi cpu dispatcher """
    output = []
    for result in tqdm_parallel_map(single_job_fn, data_list):
        output += result
    return output
Skiff answered 14/3, 2022 at 9:53 Comment(0)
A
2

I find more intuitive to use the update() method of tqdm, we keep an human readable structure:

with tqdm(total=len(mylist)) as progress:                         
    with ThreadPoolExecutor() as executor:
        for __ in executor.map(fun, mylist):
            progress.update() # We update the progress bar each time that a job finish

Since I don't care about the output of fun I use __ as throwaway variable.

Aekerly answered 8/2, 2023 at 11:44 Comment(3)
In python _ is throwaway variable not __Town
@Town Every valid variable name can be a throwaway variable. Also python store the last returned value by the shell in _. This is sometime useful to debug a program, so I always use __.Aekerly
by convention _ is used because it gets overwritten often and thus you do not risk it staying indefinitely and never garbage collected. See: #5893663Town
A
0

Just an addendum to the accepted answer:

# works
with concurrent.futures.ThreadPoolExecutor() as executor:
   futures = executor.map(f, my_iter)
   result = list(tqdm(futures), total=len(my_iter))

# does NOT work (only updates at the very end)
with concurrent.futures.ThreadPoolExecutor() as executor:
   futures = executor.map(f, my_iter)
result = list(tqdm(futures), total=len(my_iter))

Makes sense, but I'm likely not the only having tried the second approach...

Avent answered 23/11, 2023 at 18:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.