Starmap combined with tqdm?
Asked Answered
M

4

51

I am doing some parallel processing, as follows:

with mp.Pool(8) as tmpPool:
        results = tmpPool.starmap(my_function, inputs)

where inputs look like: [(1,0.2312),(5,0.52) ...] i.e., tuples of an int and a float.

The code runs nicely, yet I cannot seem to wrap it around a loading bar (tqdm), such as can be done with e.g., imap method as follows:

tqdm.tqdm(mp.imap(some_function,some_inputs))

Can this be done for starmap also?

Thanks!

Miscellanea answered 5/8, 2019 at 8:20 Comment(4)
If possible, I would say change your my_function to receive one packed argument and unpack it inside the function and then use imapSandblast
Yes, that is the default solution currently. I am still wondering whether starmap supports this (or any variant of it)Miscellanea
Not that I'm aware of or can see in the docs. The only variant I know of is starmap_async which is simply non-blocking but still returns a result object. I believe you will have to adjust your function to work with imap as it is the only option that works as a generator and not returning all results at once. Will be happy to see if there is a better solutionSandblast
Thanks, Currently, I've re-implemented it with imap. Would be nice to have the istarmap also!Miscellanea
M
51

It's not possible with starmap(), but it's possible with a patch adding Pool.istarmap(). It's based on the code for imap(). All you have to do, is create the istarmap.py-file and import the module to apply the patch before you make your regular multiprocessing-imports.

Python <3.8

# istarmap.py for Python <3.8
import multiprocessing.pool as mpp


def istarmap(self, func, iterable, chunksize=1):
    """starmap-version of imap
    """
    if self._state != mpp.RUN:
        raise ValueError("Pool not running")

    if chunksize < 1:
        raise ValueError(
            "Chunksize must be 1+, not {0:n}".format(
                chunksize))

    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    result = mpp.IMapIterator(self._cache)
    self._taskqueue.put(
        (
            self._guarded_task_generation(result._job,
                                          mpp.starmapstar,
                                          task_batches),
            result._set_length
        ))
    return (item for chunk in result for item in chunk)


mpp.Pool.istarmap = istarmap

Python 3.8+

# istarmap.py for Python 3.8+
import multiprocessing.pool as mpp


def istarmap(self, func, iterable, chunksize=1):
    """starmap-version of imap
    """
    self._check_running()
    if chunksize < 1:
        raise ValueError(
            "Chunksize must be 1+, not {0:n}".format(
                chunksize))

    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    result = mpp.IMapIterator(self)
    self._taskqueue.put(
        (
            self._guarded_task_generation(result._job,
                                          mpp.starmapstar,
                                          task_batches),
            result._set_length
        ))
    return (item for chunk in result for item in chunk)


mpp.Pool.istarmap = istarmap

Then in your script:

import istarmap  # import to apply patch
from multiprocessing import Pool
import tqdm    


def foo(a, b):
    for _ in range(int(50e6)):
        pass
    return a, b    


if __name__ == '__main__':

    with Pool(4) as pool:
        iterable = [(i, 'x') for i in range(10)]
        for _ in tqdm.tqdm(pool.istarmap(foo, iterable),
                           total=len(iterable)):
            pass
Mopboard answered 5/8, 2019 at 18:44 Comment(9)
Very nice, this is exactly what I was after! Thanks!Miscellanea
I get AttributeError: '_PoolCache' object has no attribute '_cache' - any ideas? It occurs at the line result = mp.IMapIterator(self._cache)Cherlynchernow
@Cherlynchernow I'm using mpp as name for the module, your example uses mp. Do you get the error with exactly my example from the answer, too?Mopboard
Yes I just replaced mpp and mp, it's just a personal convention, sorry. I do get the error with the same code, but it was because I had not called the module istarmap. I am currently having trouble bundling it into my own module, however, I can't seem to figure out the import statement if I put istarmap as a submodule in my own moduleCherlynchernow
@Cherlynchernow That's okay, I just didn't knew if you had some other module named mp. I'm afraid that's not really enough information to understand your problem, but you need to import istarmap before you import anything else from multiprocessing.Mopboard
@Cherlynchernow I get the same error you did. For me it had not anything t do with the abbreviation mpp vs mp but i had to alter the following line. Instead of result = mpp.IMapIterator(self._cache) i need result = mpp.IMapIterator(self) By inspecting it in the debugger is, that the constructor of IMapIterator needs to get a Pool instance, and not the cache (c.f. github.com/python/cpython/blob/3.8/Lib/multiprocessing/…)Emoryemote
@Emoryemote Thanks, I looked into it and saw they did some breaking changes in Python 3.8. There's a bit more to tweak, than just replacing self._cache, I've updated my answer with a version for Python 3.8.Mopboard
@Mopboard With your code how do you retrieve the outputs of the function? With starmap for example we just used result = pool.starmap(....)Vintager
@JulienDrevon It's already iterating over the results. In the example the results is assigned to _, within for _ in tqdm.tqdm(..., because the result doesn't get used, but that's just convention for this case. You could write for result in tqdm.tqdm(... and then print(result) on every iteration or whatever you want to do with it.Mopboard
C
54

The simplest way would probably be to apply tqdm() around the inputs, rather than the mapping function. For example:

inputs = zip(param1, param2, param3)
with mp.Pool(8) as pool:
    results = pool.starmap(my_function, tqdm.tqdm(inputs, total=len(param1)))

Note that the bar is updated when my_function is called, rather than when it returns. If that distinction matters, you can consider rewriting starmap as some other answers suggest. Otherwise, this is a simple and efficient alternative.

Choler answered 23/1, 2021 at 1:56 Comment(12)
Thanks a lot. This should be the accepted answer, I think. I had to pass the input length as total to tqdm to make it work.Fimbria
You're correct, you'll likely need the total arg for streaming/lazy iterablesCholer
Quick update: this did provide the progress bar, but the updates were not as dynamic as I hoped. It froze too much.Fimbria
Did you use chunksize != 1? It's possible elements were being pulled from the input in chunks so the progress bar updated irregularlyCholer
Thanks, having chunksize != 1 helped and made it a smoother bar!Fimbria
res_1, res_2 = zip(*pool.starmap(order2seq_multiproc, tqdm(tasks, total=len(tasks)))). This my code but the progress bar is always full and no update. Where should add the chunksize != 1? I put it right after total, it doesn't work, saying "name 'chunksize' is not defined"Hydrus
chunksize is a parameter of starmap, not tqdm. So try pool.starmap(order2seq_multiproc, tqdm(input), chunksize=chunksize)Choler
Note: a zip object doesn't have a length. Instead, total=len(param1) would workDekko
This seems to only track when the inputs are being sent, but not when the processing of my_function is completed.Haskins
I'm not sure if you're aware but gofvonx is right. This measures input- not output-progression. That's also why this appears to be faster as some people commented. Now imagine all but the last task taking up five seconds but the last one hour to complete. You could end up with getting displayed 100% progress for almost an hour before the actual finish...Mopboard
Progresbar progresses very quickly and get to the %100, and then it still continues to run until it is done.Blob
Yeah, this doesn't work anymore because the implementation of starmap creates the tasks straightaway.Frierson
M
51

It's not possible with starmap(), but it's possible with a patch adding Pool.istarmap(). It's based on the code for imap(). All you have to do, is create the istarmap.py-file and import the module to apply the patch before you make your regular multiprocessing-imports.

Python <3.8

# istarmap.py for Python <3.8
import multiprocessing.pool as mpp


def istarmap(self, func, iterable, chunksize=1):
    """starmap-version of imap
    """
    if self._state != mpp.RUN:
        raise ValueError("Pool not running")

    if chunksize < 1:
        raise ValueError(
            "Chunksize must be 1+, not {0:n}".format(
                chunksize))

    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    result = mpp.IMapIterator(self._cache)
    self._taskqueue.put(
        (
            self._guarded_task_generation(result._job,
                                          mpp.starmapstar,
                                          task_batches),
            result._set_length
        ))
    return (item for chunk in result for item in chunk)


mpp.Pool.istarmap = istarmap

Python 3.8+

# istarmap.py for Python 3.8+
import multiprocessing.pool as mpp


def istarmap(self, func, iterable, chunksize=1):
    """starmap-version of imap
    """
    self._check_running()
    if chunksize < 1:
        raise ValueError(
            "Chunksize must be 1+, not {0:n}".format(
                chunksize))

    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    result = mpp.IMapIterator(self)
    self._taskqueue.put(
        (
            self._guarded_task_generation(result._job,
                                          mpp.starmapstar,
                                          task_batches),
            result._set_length
        ))
    return (item for chunk in result for item in chunk)


mpp.Pool.istarmap = istarmap

Then in your script:

import istarmap  # import to apply patch
from multiprocessing import Pool
import tqdm    


def foo(a, b):
    for _ in range(int(50e6)):
        pass
    return a, b    


if __name__ == '__main__':

    with Pool(4) as pool:
        iterable = [(i, 'x') for i in range(10)]
        for _ in tqdm.tqdm(pool.istarmap(foo, iterable),
                           total=len(iterable)):
            pass
Mopboard answered 5/8, 2019 at 18:44 Comment(9)
Very nice, this is exactly what I was after! Thanks!Miscellanea
I get AttributeError: '_PoolCache' object has no attribute '_cache' - any ideas? It occurs at the line result = mp.IMapIterator(self._cache)Cherlynchernow
@Cherlynchernow I'm using mpp as name for the module, your example uses mp. Do you get the error with exactly my example from the answer, too?Mopboard
Yes I just replaced mpp and mp, it's just a personal convention, sorry. I do get the error with the same code, but it was because I had not called the module istarmap. I am currently having trouble bundling it into my own module, however, I can't seem to figure out the import statement if I put istarmap as a submodule in my own moduleCherlynchernow
@Cherlynchernow That's okay, I just didn't knew if you had some other module named mp. I'm afraid that's not really enough information to understand your problem, but you need to import istarmap before you import anything else from multiprocessing.Mopboard
@Cherlynchernow I get the same error you did. For me it had not anything t do with the abbreviation mpp vs mp but i had to alter the following line. Instead of result = mpp.IMapIterator(self._cache) i need result = mpp.IMapIterator(self) By inspecting it in the debugger is, that the constructor of IMapIterator needs to get a Pool instance, and not the cache (c.f. github.com/python/cpython/blob/3.8/Lib/multiprocessing/…)Emoryemote
@Emoryemote Thanks, I looked into it and saw they did some breaking changes in Python 3.8. There's a bit more to tweak, than just replacing self._cache, I've updated my answer with a version for Python 3.8.Mopboard
@Mopboard With your code how do you retrieve the outputs of the function? With starmap for example we just used result = pool.starmap(....)Vintager
@JulienDrevon It's already iterating over the results. In the example the results is assigned to _, within for _ in tqdm.tqdm(..., because the result doesn't get used, but that's just convention for this case. You could write for result in tqdm.tqdm(... and then print(result) on every iteration or whatever you want to do with it.Mopboard
C
20

As Darkonaut mentioned, as of this writing there's no istarmap natively available. If you want to avoid patching, you can add a simple *_star function as a workaround. (This solution inspired by this tutorial.)

import tqdm
import multiprocessing

def my_function(arg1, arg2, arg3):
  return arg1 + arg2 + arg3

def my_function_star(args):
    return my_function(*args)

jobs = 4
with multiprocessing.Pool(jobs) as pool:
    args = [(i, i, i) for i in range(10000)]
    results = list(tqdm.tqdm(pool.imap(my_function_star, args), total=len(args))

Some notes:

I also really like corey's answer. It's cleaner, though the progress bar does not appear to update as smoothly as my answer. Note that corey's answer is several orders of magnitude faster with the code I posted above with chunksize=1 (default). I'm guessing this is due to multiprocessing serialization, because increasing chunksize (or having a more expensive my_function) makes their runtime comparable.

I went with my answer for my application since my serialization/function cost ratio was very low.

Casimir answered 4/6, 2021 at 23:47 Comment(3)
This is the best answer! Your notes about corey's answer is on point!Presto
This is great, thanks! I think as an extension to this, you can write a general function f_star(f, args) that returns f(*args). Then you can write this as a utility function and use it anywhere you want to use tqdm with starmap.Dollie
Sorry, I made a mistake in my suggestion, it should say f_star(f_args) takes a tuple of (f, args) and returns f(*args).Dollie
M
-4

The temporary solution: rewriting the method to-be-parallelized with imap.

Miscellanea answered 5/8, 2019 at 11:23 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.