Multiprocessing : use tqdm to display a progress bar
Asked Answered
I

11

239

To make my code more "pythonic" and faster, I use multiprocessing and a map function to send it a) the function and b) the range of iterations.

The implanted solution (i.e., calling tqdm directly on the range tqdm.tqdm(range(0, 30))) does not work with multiprocessing (as formulated in the code below).

The progress bar is displayed from 0 to 100% (when python reads the code?) but it does not indicate the actual progress of the map function.

How can one display a progress bar that indicates at which step the 'map' function is ?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

Any help or suggestions are welcome...

Islean answered 29/1, 2017 at 10:58 Comment(2)
Can you post the code snippet of the progress bar?Hegelianism
For people in search for a solution with .starmap(): Here is a patch for Pool adding .istarmap(), which will also work with tqdm.Kazantzakis
I
105

Solution found. Be careful! Due to multiprocessing, the estimation time (iteration per loop, total time, etc.) could be unstable, but the progress bar works perfectly.

Note: Context manager for Pool is only available in Python 3.3+.

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for _ in p.imap_unordered(_foo, range(0, max_)):
                pbar.update()
Islean answered 29/1, 2017 at 14:26 Comment(8)
pbar.close() not required, it will be closed automatically on termination of withTewfik
Is the second/inner tqdm call necessary here?Citreous
what about the output of the _foo(my_number) that is returned as "r" in question?Herophilus
Is there a similar solution for starmap() ?Retinite
@Citreous - it seems to work without ;). Anyway - imap_unordered is key here, it gives best performance and best progress bar estimations.Benton
Is enumerate really necessary? Why not for _ in p.imap_unordered(...):Pontifex
How do I retrieve the results with this solution?Atman
instead of return square in the function, add the result to a listIslean
B
253

Use imap instead of map, which returns an iterator of the processed values.

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
Burgundy answered 24/7, 2017 at 9:25 Comment(13)
I tried this solution. It worked, but for some reason, the call to list() is necessary, as well as passing the size of the list in the total= argument for tqdm(). Why is that?Cambrian
An enclosing list() statement waits for the iterator to end. total= is also required since tqdm does not know how long the iteration will be,Burgundy
Is there a similar solution for starmap() ?Retinite
for i in tqdm.tqdm(...): pass may be a more straight-forward, that list(tqdm.tqdm)Greasewood
This works but has anyone else had it continuously print the progress bar on a newline for each iteration?Quimby
If you encounter locking issues while trying this solution, try removing the tqdm.write() statements from your code.Adel
The behaviour is wired when specific chunk_size of p.imap. Can tqdm update every iteration instead of every chunk?Marine
None of the answers worked for multithreading, though. Has anyone found a concise solution?Teilo
The method works; however, each bar is updated on the same line (overlapping progress bars for different processes). Does anyone know how to solve this?Hereunder
@Burgundy please can you elaborate more on why we need to call list(). thanksRidicule
The "patch" solution for starmap can be found here: https://mcmap.net/q/76010/-starmap-combined-with-tqdmMandler
I don't think this solution works properly. Stays in 0% for almost all the time, and suddenly goes to 100%.Atman
@CarlosSouza That's because imap keep order, iterator won't skip unfinished items. You can use imap_unordered.Singspiel
M
237

Sorry for being late but if all you need is a concurrent map, I added this functionality in tqdm>=4.42.0:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

References: https://tqdm.github.io/docs/contrib.concurrent/ and https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

It supports max_workers and chunksize and you can also easily switch from process_map to thread_map.

Marnie answered 25/1, 2020 at 0:29 Comment(18)
Cool (+1), but throws HBox(children=(FloatProgress(value=0.0, max=30.0), HTML(value=''))) in JupyterFustigate
@Ébe-Isaac see github.com/tqdm/tqdm/issues/937Marnie
I see an issue with discussion to hack tqdm_notebook, however, can't workout a solution to solve for tqdm.contrib.concurrent.Fustigate
how to close the process with process_map? something like 'p.close()' and 'p.join()' ?Rasbora
@Rasbora process_map creates, runs , closes/joins and returns a list.Marnie
This is great! So glad I found it. One question remains, when I use this in a jupyter notebook, it doesn't work very well. I know there is a tqdm.notebook, is there someway to merge the two?Eduardo
I report the same issues when using this in a jupyter notebook. Especially it crashes for thread_mapPoinciana
This makes unconditional copies of the iterated arguments, while the others seems to do copy-on-write.Volvox
Hmm.. finishes while the progress bar is stuck at zero.Kozhikode
If i pass in some kwargs (for instance, initargs, initializer - which are kwrgs for multiprocessing.Pool), does the wrapper pass them onto the Pool instance created? I can see it does pass through max_workers, and chunksize.Timothea
@Eduardo @Vladimir Vargas I don't have any issues if I do something like e.g. thread_map(fn, *iterables, tqdm_class=tqdm.notebook.tqdm, max_workers=12) in a Jupyter Notebook today.Implacable
Using this with requests gives me the wrong number of finished iterations. Prefer @Islean solutionSaturation
When I try this, my progress bar is stuck at zero and never updates.Epp
Hi, I'm using the thread_map, which, looks weird with unrecognized texts, like "?" in a box. Something like ! box[?]t.t7...Done! box[?]box[?]box[?].., the bar shows 0%. I'm using it in Windows default command line.Landel
Works perfectly here! ThnksBobbibobbie
bizarre to include this functionality in tqdm. talk about scope creepDeparture
This produces "OSError: handle is closed" every time, when using it to read files in parallel on Mac in Python 3.11 run from Jupyter. Seems to work ok in Linux, but not Mac. Seems to be a bug in ProcessPoolExecutor. Haven't found a fix other than to just not use tqdm...process_map anymore. :-(Lunisolar
For me it works 50% slower than just map with no tqdm. I might messed up something, but on my code with same run settings, I was getting 430 vs 250 seconds runtime on the same data.Cedar
I
105

Solution found. Be careful! Due to multiprocessing, the estimation time (iteration per loop, total time, etc.) could be unstable, but the progress bar works perfectly.

Note: Context manager for Pool is only available in Python 3.3+.

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for _ in p.imap_unordered(_foo, range(0, max_)):
                pbar.update()
Islean answered 29/1, 2017 at 14:26 Comment(8)
pbar.close() not required, it will be closed automatically on termination of withTewfik
Is the second/inner tqdm call necessary here?Citreous
what about the output of the _foo(my_number) that is returned as "r" in question?Herophilus
Is there a similar solution for starmap() ?Retinite
@Citreous - it seems to work without ;). Anyway - imap_unordered is key here, it gives best performance and best progress bar estimations.Benton
Is enumerate really necessary? Why not for _ in p.imap_unordered(...):Pontifex
How do I retrieve the results with this solution?Atman
instead of return square in the function, add the result to a listIslean
S
36

You can use p_tqdm instead.

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))
Spinifex answered 26/3, 2019 at 22:8 Comment(7)
This works extremely well, and it was very easy to pip install. This is replacing tqdm for most of my needsJuxon
Merci Victor ;)Unlovely
p_tqdm is limited to multiprocessing.Pool, not available for threadsCresida
Can I specify the number of workers for p_map?Gunman
@VictorWang Yes use it in num_cpus like this => p_map(_foo, list(range(0, 30)), num_cpus=5)Bowles
How do I mention a description for the progress bar? In tqdm there is an argument called desc which can be provided with the description. I could not find something like that for p_tqdm.Tessy
@VandanRevanur You can pass kwargs to p_tqdm and they will be forwarded to tqdm, like so: github.com/swansonk14/p_tqdm/issues/5Spinifex
W
10

based on the answer of Xavi Martínez I wrote the function imap_unordered_bar. It can be used in the same way as imap_unordered with the only difference that a processing bar is shown.

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))
Wheel answered 12/8, 2017 at 16:56 Comment(3)
This will redraw the bar at each step on a new line. How to update the same line?Goldengoldenberg
Solution in my case (Windows/Powershell): Colorama.Goldengoldenberg
'pbar.close() not required, it will be closed automatically on termination of with' like the comment Sagar made on @scipy's answerCamisole
P
7
import multiprocessing as mp
import tqdm


iterable = ... 
num_cpu = mp.cpu_count() - 2 # dont use all cpus.


def func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(num_cpu) as p:
        list(tqdm.tqdm(p.imap(func, iterable), total=len(iterable)))
Psittacine answered 22/11, 2019 at 15:18 Comment(0)
G
7

For progress bar with apply_async, we can use following code as suggested in:

https://github.com/tqdm/tqdm/issues/484

import time
import random
from multiprocessing import Pool
from tqdm import tqdm

def myfunc(a):
    time.sleep(random.random())
    return a ** 2

pool = Pool(2)
pbar = tqdm(total=100)

def update(*a):
    pbar.update()

for i in range(pbar.total):
    pool.apply_async(myfunc, args=(i,), callback=update)
pool.close()
pool.join()
Gassy answered 15/4, 2021 at 20:7 Comment(0)
R
1

Here is my take for when you need to get results back from your parallel executing functions. This function does a few things (there is another post of mine that explains it further) but the key point is that there is a tasks pending queue and a tasks completed queue. As workers are done with each task in the pending queue they add the results in the tasks completed queue. You can wrap the check to the tasks completed queue with the tqdm progress bar. I am not putting the implementation of the do_work() function here, it is not relevant, as the message here is to monitor the tasks completed queue and update the progress bar every time a result is in.

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results
Rumilly answered 20/8, 2020 at 20:33 Comment(0)
S
1

Based on "user17242583" answer, I created the following function. It should be as fast as Pool.map and the results are always ordered. Plus, you can pass as many parameters to your function as you want and not just a single iterable.

from multiprocessing import Pool
from functools import partial
from tqdm import tqdm


def imap_tqdm(function, iterable, processes, chunksize=1, desc=None, disable=False, **kwargs):
    """
    Run a function in parallel with a tqdm progress bar and an arbitrary number of arguments.
    Results are always ordered and the performance should be the same as of Pool.map.
    :param function: The function that should be parallelized.
    :param iterable: The iterable passed to the function.
    :param processes: The number of processes used for the parallelization.
    :param chunksize: The iterable is based on the chunk size chopped into chunks and submitted to the process pool as separate tasks.
    :param desc: The description displayed by tqdm in the progress bar.
    :param disable: Disables the tqdm progress bar.
    :param kwargs: Any additional arguments that should be passed to the function.
    """
    if kwargs:
        function_wrapper = partial(_wrapper, function=function, **kwargs)
    else:
        function_wrapper = partial(_wrapper, function=function)

    results = [None] * len(iterable)
    with Pool(processes=processes) as p:
        with tqdm(desc=desc, total=len(iterable), disable=disable) as pbar:
            for i, result in p.imap_unordered(function_wrapper, enumerate(iterable), chunksize=chunksize):
                results[i] = result
                pbar.update()
    return results


def _wrapper(enum_iterable, function, **kwargs):
    i = enum_iterable[0]
    result = function(enum_iterable[1], **kwargs)
    return i, result
Seism answered 7/9, 2022 at 12:12 Comment(0)
B
1

tqdm has released its own simple, elegant APIs for concurrent.

I give the following snippet as a straightforward example to illustrate multi-threading.

from tqdm.contrib.concurrent import thread_map

def f(row):
    x, y = row
    time.sleep(1)  # to visualize the progress

thread_map(f, [(x, y) for x, y in zip(range(1000), range(1000))])
Brimstone answered 18/11, 2023 at 1:37 Comment(0)
L
-3

This approach simple and it works.

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():
    time.sleep(1)
    pbar.update()

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):
        pool.apply_async(job)
    pool.close()
    pool.join()
Landa answered 10/6, 2019 at 14:17 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.