How can we use tqdm in a parallel execution with joblib?
Asked Answered
C

10

75

I want to run a function in parallel, and wait until all parallel nodes are done, using joblib. Like in the example:

from math import sqrt
from joblib import Parallel, delayed
Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))

But, I want that the execution will be seen in a single progressbar like with tqdm, showing how many jobs has been completed.

How would you do that?

Convict answered 14/6, 2016 at 6:17 Comment(2)
danshiebler.com/2016-09-14-parallel-progress-bar Maybe this site can help you.Gambit
See niedakh just underneath !Train
D
2

If your problem consists of many parts, you could split the parts into k subgroups, run each subgroup in parallel and update the progressbar in between, resulting in k updates of the progress.

This is demonstrated in the following example from the documentation.

>>> with Parallel(n_jobs=2) as parallel:
...    accumulator = 0.
...    n_iter = 0
...    while accumulator < 1000:
...        results = parallel(delayed(sqrt)(accumulator + i ** 2)
...                           for i in range(5))
...        accumulator += sum(results)  # synchronization barrier
...        n_iter += 1

https://pythonhosted.org/joblib/parallel.html#reusing-a-pool-of-workers

Dymoke answered 14/6, 2016 at 21:37 Comment(2)
How does this answer the question about "a single progressbar"?Pauper
This absolutely doesn't answer the question about the progress bar thoughPurpura
J
58

Just put range(10) inside tqdm(...)! It probably seemed too good to be true for you, but it really works (on my machine):

from math import sqrt
from joblib import Parallel, delayed  
from tqdm import tqdm  
result = Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in tqdm(range(100000)))
Jacklin answered 19/6, 2018 at 10:5 Comment(5)
This only shows progress when the process starts, not when it is finished: Parallel(n_jobs=10)(delayed(time.sleep)(i ** 2) for i in tqdm(range(10)))Ellis
It works, but not with a list of strings for example... Also tried wrapping the list in iter...Polacre
@Polacre Try putting the list into a generator, the following seems to work for me: from math import sqrt from joblib import Parallel, delayed import multiprocessing from tqdm import tqdm rng = range(100000) rng = ['a','b','c','d'] for j in range(20): rng += rng def get_rng(): i = 0 for i in range(len(rng)): yield rng[i] result = Parallel(n_jobs=2)(delayed(sqrt)(len(i) ** 2) for i in tqdm(get_rng()))Jacklin
In another question, there is a very elegant solution to this problem.Lareine
This won't work, tqdm will go to %100 immediately.Percipient
L
52

I've created pqdm a parallel tqdm wrapper with concurrent futures to comfortably get this done, give it a try!

To install

pip install pqdm

and use

from pqdm.processes import pqdm
# If you want threads instead:
# from pqdm.threads import pqdm

args = [1, 2, 3, 4, 5]
# args = range(1,6) would also work

def square(a):
    return a*a

result = pqdm(args, square, n_jobs=2)
Lamoree answered 8/3, 2020 at 21:56 Comment(7)
Well done guy ! Can't stand why you're not accepted. Big Thank You !Train
Unfortunately this fails for me. I am not sure why, but it looks like pqdm does not wait until the end of the function calls. I do not have time now to create a MWE. Still, thanks for the effort (and +1).Unclinch
@YairDaon maybe try it will work with the bounded executor, try adding bounded=True to pqdm.Lamoree
Does it work over list comprehensions?Taunt
Interesting library. Can you tell us in what way it differs or is better than p_tqdm which is found at github.com/swansonk14/p_tqdm Seems like the latter is more developed.Lanfri
pqdm does a similar thing, but pqdm does not depend on pathos and you can easily exchange tqdm variants (like use slack_tqdm or discord_tqdm instead of the main tqdm.auto). Not sure what you mean by more developed, i'm a fan of simple tools for simple tasks, so pqdm is being lightweight, as dependency-less as possible.Lamoree
This is a great library! It's such a breeze to useIncapacity
U
27

Modifying nth's great answer to permit a dynamic flag to use TQDM or not and to specify the total ahead of time so that the status bar fills in correctly.

from tqdm.auto import tqdm
from joblib import Parallel

class ProgressParallel(Parallel):
    def __init__(self, use_tqdm=True, total=None, *args, **kwargs):
        self._use_tqdm = use_tqdm
        self._total = total
        super().__init__(*args, **kwargs)

    def __call__(self, *args, **kwargs):
        with tqdm(disable=not self._use_tqdm, total=self._total) as self._pbar:
            return Parallel.__call__(self, *args, **kwargs)

    def print_progress(self):
        if self._total is None:
            self._pbar.total = self.n_dispatched_tasks
        self._pbar.n = self.n_completed_tasks
        self._pbar.refresh()
Unhopedfor answered 19/5, 2020 at 20:48 Comment(1)
For anyone wondering (like me), adding this to Joblib itself as an optional feature has been discussed on the issue tracker: <github.com/joblib/joblib/issues/972>Rehm
S
23

There is no need to install additional packages. You can use tqdm's native support in the contrib.concurrent: https://tqdm.github.io/docs/contrib.concurrent/

from tqdm.contrib.concurrent import process_map
# If you want threads instead:
# from tqdm.contrib.concurrent import thread_map
import time

args = range(5)

def square(a):
    time.sleep(a)
    return a*a

result = process_map(square, args, max_workers=2)
Sydelle answered 19/8, 2022 at 10:40 Comment(2)
I don't know why no upvotes. This is one of the best answers for doing this kind of thing.Turnstile
By far the simplest solutionEdgardo
M
18

As noted above, solutions that simply wrap the iterable passed to joblib.Parallel() do not truly monitor the progress of execution. Instead, I suggest subclassing Parallel and overriding the print_progress() method, as follows:

import joblib
from tqdm.auto import tqdm

class ProgressParallel(joblib.Parallel):
    def __call__(self, *args, **kwargs):
        with tqdm() as self._pbar:
            return joblib.Parallel.__call__(self, *args, **kwargs)

    def print_progress(self):
        self._pbar.total = self.n_dispatched_tasks
        self._pbar.n = self.n_completed_tasks
        self._pbar.refresh()
Mantooth answered 4/4, 2020 at 12:3 Comment(0)
D
7

Here's possible workaround

def func(x):
    time.sleep(random.randint(1, 10))
    return x

def text_progessbar(seq, total=None):
    step = 1
    tick = time.time()
    while True:
        time_diff = time.time()-tick
        avg_speed = time_diff/step
        total_str = 'of %n' % total if total else ''
        print('step', step, '%.2f' % time_diff, 
              'avg: %.2f iter/sec' % avg_speed, total_str)
        step += 1
        yield next(seq)

all_bar_funcs = {
    'tqdm': lambda args: lambda x: tqdm(x, **args),
    'txt': lambda args: lambda x: text_progessbar(x, **args),
    'False': lambda args: iter,
    'None': lambda args: iter,
}

def ParallelExecutor(use_bar='tqdm', **joblib_args):
    def aprun(bar=use_bar, **tq_args):
        def tmp(op_iter):
            if str(bar) in all_bar_funcs.keys():
                bar_func = all_bar_funcs[str(bar)](tq_args)
            else:
                raise ValueError("Value %s not supported as bar type"%bar)
            return Parallel(**joblib_args)(bar_func(op_iter))
        return tmp
    return aprun

aprun = ParallelExecutor(n_jobs=5)

a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
Durgy answered 4/11, 2016 at 4:49 Comment(1)
It is a walk around, but the progress bar updates only when a task is dispatched. The better timing to update the progress bar is the time when the task is completed.Elytron
S
6

I created the tqdm_joblib to solve this issue.

Installation: pip install tqdm-joblib

From the readme:

Simple snippet copied from https://mcmap.net/q/270869/-tracking-progress-of-joblib-parallel-execution packaged for simple reuse.

from joblib import Parallel, delayed
from tqdm_joblib import tqdm_joblib

with tqdm_joblib(desc="My calculation", total=10) as progress_bar:
    Parallel(n_jobs=16)(delayed(sqrt)(i**2) for i in range(10))
Skylab answered 24/8, 2022 at 22:2 Comment(1)
This is definitely the easiest way to do it. As a suggestion, you can edit your answer and mention how to install it: pip install tqdm-job from pypi.org/project/tqdm-joblibShannan
D
2

If your problem consists of many parts, you could split the parts into k subgroups, run each subgroup in parallel and update the progressbar in between, resulting in k updates of the progress.

This is demonstrated in the following example from the documentation.

>>> with Parallel(n_jobs=2) as parallel:
...    accumulator = 0.
...    n_iter = 0
...    while accumulator < 1000:
...        results = parallel(delayed(sqrt)(accumulator + i ** 2)
...                           for i in range(5))
...        accumulator += sum(results)  # synchronization barrier
...        n_iter += 1

https://pythonhosted.org/joblib/parallel.html#reusing-a-pool-of-workers

Dymoke answered 14/6, 2016 at 21:37 Comment(2)
How does this answer the question about "a single progressbar"?Pauper
This absolutely doesn't answer the question about the progress bar thoughPurpura
G
1

None of the other answers including classes by user394430 or nth worked for me.

But this answer from a similar question works perfectly. Reposting for ease.

import contextlib
import joblib
from tqdm import tqdm

@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
    """Context manager to patch joblib to report into tqdm progress bar given as argument"""
    class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
        def __call__(self, *args, **kwargs):
            tqdm_object.update(n=self.batch_size)
            return super().__call__(*args, **kwargs)

    old_batch_callback = joblib.parallel.BatchCompletionCallBack
    joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
    try:
        yield tqdm_object
    finally:
        joblib.parallel.BatchCompletionCallBack = old_batch_callback
        tqdm_object.close()

Then wrap as a context manager

from math import sqrt
from joblib import Parallel, delayed

with tqdm_joblib(tqdm(desc="My calculation", total=10)) as progress_bar:
    Parallel(n_jobs=16)(delayed(sqrt)(i**2) for i in range(10))

Works with versions:

  • joblib - 1.2.0
  • tqdm - 4.64.1
  • python - 3.9.13
Grandstand answered 9/11, 2022 at 15:32 Comment(0)
A
1

copy paste from this answer


As of joblib v1.3.0, released in June 2023, there's an easier way to wrap joblib.Parallel with the tqdm progress bar (inspired by this comment).

This progress bar will track job completion, not job enqueueing. Previously this required a special context manager. Here's an example:

from joblib import Parallel, delayed
from tqdm import tqdm

import time
import random

# Our example worker will sleep for a certain number of seconds.

inputs = list(range(10))
random.shuffle(inputs)

def worker(n_seconds):
    time.sleep(n_seconds)
    return n_seconds

# Run the worker jobs in parallel, with a tqdm progress bar.
# We configure Parallel to return a generator.
# Then we wrap the generator in tqdm.
# Finally, we execute everything by converting the tqdm generator to a list.

outputs = list(
    tqdm(
        # Note the new return_as argument here, which requires joblib >= 1.3:
        Parallel(return_as="generator", n_jobs=3)(
            delayed(worker)(n_seconds) for n_seconds in inputs
        ),
        total=len(inputs),
    )
)
print(outputs)
Auscultate answered 6/2 at 15:37 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.