Tracking progress of joblib.Parallel execution
Asked Answered
A

11

66

Is there a simple way to track the overall progress of a joblib.Parallel execution?

I have a long-running execution composed of thousands of jobs, which I want to track and record in a database. However, to do that, whenever Parallel finishes a task, I need it to execute a callback, reporting how many remaining jobs are left.

I've accomplished a similar task before with Python's stdlib multiprocessing.Pool, by launching a thread that records the number of pending jobs in Pool's job list.

Looking at the code, Parallel inherits Pool, so I thought I could pull off the same trick, but it doesn't seem to use these that list, and I haven't been able to figure out how else to "read" it's internal status any other way.

Ayer answered 27/7, 2014 at 17:20 Comment(0)
K
27

Why can't you simply use tqdm? The following worked for me

from joblib import Parallel, delayed
from datetime import datetime
from tqdm import tqdm

def myfun(x):
    return x**2

results = Parallel(n_jobs=8)(delayed(myfun)(i) for i in tqdm(range(1000))
100%|██████████| 1000/1000 [00:00<00:00, 10563.37it/s]
Kenward answered 20/4, 2018 at 22:59 Comment(10)
I don't think this is actually monitoring the completion of running jobs, just the queuing of jobs. If you were to insert a time.sleep(1) at the start of myfun you would find the tqdm progress finishes almost instantly, but results takes a few more seconds to populate.Clerissa
Yes, that’s partly correct. It is tracking the job starts vs the completions, but the other issue is that there is also a delay caused by overhead after all jobs are completed. Once all tasks are completed results need to be collected and this can take quite a while.Kenward
I believe this answer doesn't really answer the question. As it was mentioned, one will track queuing and not the execution itself with this approach. The approach with callback shown below seems to be more precise in relation to the question.Stacistacia
@Stacistacia yes, that was addressed in the former comment.Kenward
This answer is incorrect, as it does not answer the question. This answer should be unaccepted.Leena
The provided answer by frenzykryger below contains a great solution to the problem of this answer.Achieve
It's wrong. It only counts the job start times which happens immediately.Mcneil
This worked for me with a reasonably complex logistic regression function called on thousands of probes in parallel: stats = parallel(func(data, phenotype) for data in tqdm(meth_data, total=len(all_probes), desc='Probes') ) [meth_data is a dataframe and I'm passing each column through the function]Portfolio
Oct 2022 and this wrong answer is still the accepted answer. This will just show the progress of start of jobs. @Ayer please change the accepted answer.Ikhnaton
While this answer is indeed technically wrong, as several comments have pointed out, it's still useful: it's the simplest solution, way easier to do that the other answers, and when I'm using it with a large number of short jobs, completion is not very long after the queueing, so in some cases it can be good enough.Thagard
A
93

Yet another step ahead from dano's and Connor's answers is to wrap the whole thing as a context manager:

import contextlib
import joblib
from tqdm import tqdm

@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
    """Context manager to patch joblib to report into tqdm progress bar given as argument"""
    class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
        def __call__(self, *args, **kwargs):
            tqdm_object.update(n=self.batch_size)
            return super().__call__(*args, **kwargs)

    old_batch_callback = joblib.parallel.BatchCompletionCallBack
    joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
    try:
        yield tqdm_object
    finally:
        joblib.parallel.BatchCompletionCallBack = old_batch_callback
        tqdm_object.close()

Then you can use it like this and don't leave monkey patched code once you're done:

from math import sqrt
from joblib import Parallel, delayed

with tqdm_joblib(tqdm(desc="My calculation", total=10)) as progress_bar:
    Parallel(n_jobs=16)(delayed(sqrt)(i**2) for i in range(10))

which is awesome I think and it looks similar to tqdm pandas integration.

Amandaamandi answered 19/11, 2019 at 14:50 Comment(5)
Excellent solution. Tested with joblib 0.14.1 and tqdm 4.41.0 -- works great. This would be a great addition to tqdm!Neuroblast
I can't edit it, but minor typo in solution where joblib.parallel.BatchCompletionCallback is actually BatchCompletionCallBack (note the camelcase on CallBack)Whereon
I just posted this code to PyPI: github.com/louisabraham/tqdm_joblib Now you can just pip install tqdm_joblib and from tqdm_joblib import tqdm_joblibGrandmotherly
i think this is no longer workingIndeciduous
Fantastic, this works out of the box, thank you, featuredpeow and AlanSTACK! I have also successfully tested using a Parallel context within this context, so with tqdm_joblib() as progress_bar: with Parallel as parallel: <code>Counterreply
K
27

Why can't you simply use tqdm? The following worked for me

from joblib import Parallel, delayed
from datetime import datetime
from tqdm import tqdm

def myfun(x):
    return x**2

results = Parallel(n_jobs=8)(delayed(myfun)(i) for i in tqdm(range(1000))
100%|██████████| 1000/1000 [00:00<00:00, 10563.37it/s]
Kenward answered 20/4, 2018 at 22:59 Comment(10)
I don't think this is actually monitoring the completion of running jobs, just the queuing of jobs. If you were to insert a time.sleep(1) at the start of myfun you would find the tqdm progress finishes almost instantly, but results takes a few more seconds to populate.Clerissa
Yes, that’s partly correct. It is tracking the job starts vs the completions, but the other issue is that there is also a delay caused by overhead after all jobs are completed. Once all tasks are completed results need to be collected and this can take quite a while.Kenward
I believe this answer doesn't really answer the question. As it was mentioned, one will track queuing and not the execution itself with this approach. The approach with callback shown below seems to be more precise in relation to the question.Stacistacia
@Stacistacia yes, that was addressed in the former comment.Kenward
This answer is incorrect, as it does not answer the question. This answer should be unaccepted.Leena
The provided answer by frenzykryger below contains a great solution to the problem of this answer.Achieve
It's wrong. It only counts the job start times which happens immediately.Mcneil
This worked for me with a reasonably complex logistic regression function called on thousands of probes in parallel: stats = parallel(func(data, phenotype) for data in tqdm(meth_data, total=len(all_probes), desc='Probes') ) [meth_data is a dataframe and I'm passing each column through the function]Portfolio
Oct 2022 and this wrong answer is still the accepted answer. This will just show the progress of start of jobs. @Ayer please change the accepted answer.Ikhnaton
While this answer is indeed technically wrong, as several comments have pointed out, it's still useful: it's the simplest solution, way easier to do that the other answers, and when I'm using it with a large number of short jobs, completion is not very long after the queueing, so in some cases it can be good enough.Thagard
Q
22

The documentation you linked to states that Parallel has an optional progress meter. It's implemented by using the callback keyword argument provided by multiprocessing.Pool.apply_async:

# This is inside a dispatch function
self._lock.acquire()
job = self._pool.apply_async(SafeFunction(func), args,
            kwargs, callback=CallBack(self.n_dispatched, self))
self._jobs.append(job)
self.n_dispatched += 1

...

class CallBack(object):
    """ Callback used by parallel: it is used for progress reporting, and
        to add data to be processed
    """
    def __init__(self, index, parallel):
        self.parallel = parallel
        self.index = index

    def __call__(self, out):
        self.parallel.print_progress(self.index)
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()

And here's print_progress:

def print_progress(self, index):
    elapsed_time = time.time() - self._start_time

    # This is heuristic code to print only 'verbose' times a messages
    # The challenge is that we may not know the queue length
    if self._original_iterable:
        if _verbosity_filter(index, self.verbose):
            return
        self._print('Done %3i jobs       | elapsed: %s',
                    (index + 1,
                     short_format_time(elapsed_time),
                    ))
    else:
        # We are finished dispatching
        queue_length = self.n_dispatched
        # We always display the first loop
        if not index == 0:
            # Display depending on the number of remaining items
            # A message as soon as we finish dispatching, cursor is 0
            cursor = (queue_length - index + 1
                      - self._pre_dispatch_amount)
            frequency = (queue_length // self.verbose) + 1
            is_last_item = (index + 1 == queue_length)
            if (is_last_item or cursor % frequency):
                return
        remaining_time = (elapsed_time / (index + 1) *
                    (self.n_dispatched - index - 1.))
        self._print('Done %3i out of %3i | elapsed: %s remaining: %s',
                    (index + 1,
                     queue_length,
                     short_format_time(elapsed_time),
                     short_format_time(remaining_time),
                    ))

The way they implement this is kind of weird, to be honest - it seems to assume that tasks will always be completed in the order that they're started. The index variable that goes to print_progress is just the self.n_dispatched variable at the time the job was actually started. So the first job launched will always finish with an index of 0, even if say, the third job finished first. It also means they don't actually keep track of the number of completed jobs. So there's no instance variable for you to monitor.

I think your best best is to make your own CallBack class, and monkey patch Parallel:

from math import sqrt
from collections import defaultdict
from joblib import Parallel, delayed

class CallBack(object):
    completed = defaultdict(int)

    def __init__(self, index, parallel):
        self.index = index
        self.parallel = parallel

    def __call__(self, index):
        CallBack.completed[self.parallel] += 1
        print("done with {}".format(CallBack.completed[self.parallel]))
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.CallBack = CallBack

if __name__ == "__main__":
    print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))

Output:

done with 1
done with 2
done with 3
done with 4
done with 5
done with 6
done with 7
done with 8
done with 9
done with 10
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

That way, your callback gets called whenever a job completes, rather than the default one.

Questionless answered 27/7, 2014 at 18:24 Comment(2)
Great research, thanks. I didn't notice the callback attribute.Ayer
I found the documentation of joblib is very limited. I have to dig into the source code for this CallBack class. My question: can I customize the arguments when __call__ is called? (sub-classing the whole Parallel class may be one way but it's heavy for me).Inconsiderate
D
11

Expanding on dano's answer for the newest version of the joblib library. There were a couple of changes to the internal implementation.

from joblib import Parallel, delayed
from collections import defaultdict

# patch joblib progress callback
class BatchCompletionCallBack(object):
  completed = defaultdict(int)

  def __init__(self, time, index, parallel):
    self.index = index
    self.parallel = parallel

  def __call__(self, index):
    BatchCompletionCallBack.completed[self.parallel] += 1
    print("done with {}".format(BatchCompletionCallBack.completed[self.parallel]))
    if self.parallel._original_iterator is not None:
      self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack
Dropkick answered 23/1, 2017 at 20:28 Comment(0)
C
11

As of joblib v1.3.0, released in June 2023, there's an easier way to wrap joblib.Parallel with the tqdm progress bar (inspired by this comment).

This progress bar will track job completion, not job enqueueing. Previously this required a special context manager. Here's an example:

from joblib import Parallel, delayed
from tqdm import tqdm

import time
import random

# Our example worker will sleep for a certain number of seconds.

inputs = list(range(10))
random.shuffle(inputs)

def worker(n_seconds):
    time.sleep(n_seconds)
    return n_seconds

# Run the worker jobs in parallel, with a tqdm progress bar.
# We configure Parallel to return a generator.
# Then we wrap the generator in tqdm.
# Finally, we execute everything by converting the tqdm generator to a list.

outputs = list(
    tqdm(
        # Note the new return_as argument here, which requires joblib >= 1.3:
        Parallel(return_as="generator", n_jobs=3)(
            delayed(worker)(n_seconds) for n_seconds in inputs
        ),
        total=len(inputs),
    )
)
print(outputs)
Cloutier answered 20/7, 2023 at 1:19 Comment(2)
Excellent solution! I think now that joblib v1.3.0 is out, this should probably be the accepted answer. It works great and is much simpler than the other solutions.Jallier
This is neat, but it should be noted that return_as="generator" only works with a few selected backends (including loky and threading)Thinia
V
10

TLDR solution:

Works with joblib 0.14.0 and tqdm 4.46.0 using python 3.5. Credits to frenzykryger for contextlib suggestions, dano and Connor for monkey patching idea.

import contextlib
import joblib
from tqdm import tqdm
from joblib import Parallel, delayed

@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
    """Context manager to patch joblib to report into tqdm progress bar given as argument"""

    def tqdm_print_progress(self):
        if self.n_completed_tasks > tqdm_object.n:
            n_completed = self.n_completed_tasks - tqdm_object.n
            tqdm_object.update(n=n_completed)

    original_print_progress = joblib.parallel.Parallel.print_progress
    joblib.parallel.Parallel.print_progress = tqdm_print_progress

    try:
        yield tqdm_object
    finally:
        joblib.parallel.Parallel.print_progress = original_print_progress
        tqdm_object.close()

You can use this the same way as described by frenzykryger

import time
def some_method(wait_time):
    time.sleep(wait_time)

with tqdm_joblib(tqdm(desc="My method", total=10)) as progress_bar:
    Parallel(n_jobs=2)(delayed(some_method)(0.2) for i in range(10))

Longer explanation:

The solution by Jon is simple to implement, but it only measures the dispatched task. If the task takes a long time, the bar will be stuck at 100% while waiting for the last dispatched task to finish execution.

The context manager approach by frenzykryger, improved from dano and Connor, is better, but the BatchCompletionCallBack can also be called with ImmediateResult before the task completes (See Intermediate results from joblib). This is going to get us a count that is over 100%.

Instead of monkey patching the BatchCompletionCallBack, we can just patch the print_progress function in Parallel. The BatchCompletionCallBack already calls this print_progress anyway. If the verbose is set (i.e. Parallel(n_jobs=2, verbose=100)), the print_progress will be printing out completed tasks, though not as nice as tqdm. Looking at the code, the print_progress is a class method, so it already has self.n_completed_tasks that logs the number we want. All we have to do is just to compare this with the current state of joblib's progress and update only if there is a difference.

This was tested in joblib 0.14.0 and tqdm 4.46.0 using python 3.5.

Vincenza answered 8/5, 2020 at 22:32 Comment(0)
U
4

Text progress bar

One more variant for those, who want text progress bar without additional modules like tqdm. Actual for joblib=0.11, python 3.5.2 on linux at 16.04.2018 and shows progress upon subtask completion.

Redefine native class:

class BatchCompletionCallBack(object):
    # Added code - start
    global total_n_jobs
    # Added code - end
    def __init__(self, dispatch_timestamp, batch_size, parallel):
        self.dispatch_timestamp = dispatch_timestamp
        self.batch_size = batch_size
        self.parallel = parallel

    def __call__(self, out):
        self.parallel.n_completed_tasks += self.batch_size
        this_batch_duration = time.time() - self.dispatch_timestamp

        self.parallel._backend.batch_completed(self.batch_size,
                                           this_batch_duration)
        self.parallel.print_progress()
        # Added code - start
        progress = self.parallel.n_completed_tasks / total_n_jobs
        print(
            "\rProgress: [{0:50s}] {1:.1f}%".format('#' * int(progress * 50), progress*100)
            , end="", flush=True)
        if self.parallel.n_completed_tasks == total_n_jobs:
            print('\n')
        # Added code - end
        if self.parallel._original_iterator is not None:
            self.parallel.dispatch_next()

import joblib.parallel
import time
joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack

Define global constant before usage with total number of jobs:

total_n_jobs = 10

This will result in something like this:

Progress: [########################################          ] 80.0%
Uprising answered 16/4, 2018 at 13:13 Comment(1)
Works great. If you want to print a time estimate too you can adapt __call__ with the following: ``` time_remaining = (this_batch_duration / self.batch_size) * (total_n_jobs - self.parallel.n_completed_tasks) print( "\rProgress: [{0:50s}] {1:.1f}% est {2:1f}mins left".format('#' * int(progress * 50), progress*100, time_remaining/60) , end="", flush=True) ```Hugmetight
S
1

Here's another answer to your question with the following syntax:

aprun = ParallelExecutor(n_jobs=5)

a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))

https://mcmap.net/q/269483/-how-can-we-use-tqdm-in-a-parallel-execution-with-joblib

Sabu answered 4/11, 2016 at 5:35 Comment(0)
B
0

In Jupyter tqdm starts a new line in the output each time it outputs. So for Jupyter Notebook it will be:

For use in Jupyter notebook. No sleeps:

from joblib import Parallel, delayed
from datetime import datetime
from tqdm import notebook

def myfun(x):
    return x**2

results = Parallel(n_jobs=8)(delayed(myfun)(i) for i in notebook.tqdm(range(1000)))  

100% 1000/1000 [00:06<00:00, 143.70it/s]

With time.sleep:

from joblib import Parallel, delayed
from datetime import datetime
from tqdm import notebook
from random import randint
import time

def myfun(x):
    time.sleep(randint(1, 5))
    return x**2

results = Parallel(n_jobs=7)(delayed(myfun)(i) for i in notebook.tqdm(range(100)))

What I'm currently using instead of joblib.Parallel:

import concurrent.futures
from tqdm import notebook
from random import randint
import time

iterable = [i for i in range(50)]

def myfun(x):
    time.sleep(randint(1, 5))
    return x**2

def run(func, iterable, max_workers=8):
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(notebook.tqdm(executor.map(func, iterable), total=len(iterable)))
    return results

run(myfun, iterable)
Blakely answered 31/5, 2019 at 10:21 Comment(4)
Wrong, this only counts the job start times which will be immediate no matter what function you are wrapping.Mcneil
How can it be wrong if it's from the official documentation? joblib.readthedocs.io/en/latest Ctrl+F for "Parallel(n_jobs=1)" And my answer was about running tqdm in Jupyter notebook. It is almost the same as the accepted one. The only difference is that it is intended for use in Jupyter notebook.Delphadelphi
I think I got it. Looks like you're right.Delphadelphi
However, it is not instant in Jupyter notebook. For example, 14% 14/100 [00:05<00:31, 2.77it/s] It takes time to complete with random time sleeps.Delphadelphi
B
0

Setting verbose=13 was enough for me: https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html

I get a line on stderr that says something like:

[Parallel(n_jobs=16)]: Done 134 tasks      | elapsed:  7.7min
Bes answered 1/12, 2022 at 15:22 Comment(0)
L
0
import joblib
class ProgressParallel(joblib.Parallel):
    def __init__(self, n_total_tasks=None, **kwargs):
        super().__init__(**kwargs)
        self.n_total_tasks = n_total_tasks

    def __call__(self, *args, **kwargs):
        with tqdm() as self._pbar:
            return joblib.Parallel.__call__(self, *args, **kwargs)

    def print_progress(self):
        if self.n_total_tasks:
            self._pbar.total = self.n_total_tasks
        else:
            self._pbar.total = self.n_dispatched_tasks
        self._pbar.n = self.n_completed_tasks
        self._pbar.refresh()
Lorola answered 16/3, 2023 at 23:17 Comment(1)
Would you mind adding a bit of explanation to your code?Shuster

© 2022 - 2024 — McMap. All rights reserved.