How to use a generator as an iterable with Multiprocessing map function
Asked Answered
H

3

18

When I use a generator as an iterable argument with multiprocessing.Pool.map function:

pool.map(func, iterable=(x for x in range(10)))

It seems that the generator is fully exhausted before func is ever called.

I want to yield each item and pass it to each process.

Heddi answered 22/6, 2017 at 19:57 Comment(7)
multiprocessing.Pool.map makes no guarantees about how soon func will be called on each argument extracted from the iterable.Barnyard
x for x in range(10) => range(10) :)Overlap
@Jean-FrançoisFabre range(10) returns a list, not a generator from my understandingHeddi
@Heddi In Python 2 it returns a list. In Python 3 it returns a generator. In Python 2 you can use xrange to get a generator.Decembrist
@DeepSpace: It's a lazy sequence, not a generator. In particular, iterating over it once won't exhaust it. That said, we don't need a generator, except perhaps for demonstration purposes.Barnyard
@Barnyard Indeed... well, it is still not a list ;)Decembrist
I was not able to replicate your issue, could you perhaps post more code so that we can see what's wrong?Panegyrize
B
21

multiprocessing.map converts iterables without a __len__ method to a list before processing. This is done to aid the calculation of chunksize, which the pool uses to group worker arguments and reduce the round trip cost of scheduling jobs. This is not optimal, especially when chunksize is 1, but since map must exhaust the iterator one way or the other, its usually not a significant issue.

The relevant code is in pool.py. Notice its use of len:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
        error_callback=None):
    '''
    Helper function to implement map, starmap and their async counterparts.
    '''
    if self._state != RUN:
        raise ValueError("Pool not running")
    if not hasattr(iterable, '__len__'):
        iterable = list(iterable)

    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0
Backstage answered 22/6, 2017 at 20:38 Comment(0)
M
7

Alas, this isn't well-documented/defined. Here's a test case I'm running under Python 3.6.1:

import multiprocessing as mp

def e(i):
    if i % 1000000 == 0:
        print(i)

if __name__ == '__main__':
    p = mp.Pool()
    def g():
        for i in range(100000000):
            yield i
        print("generator done")
    r = p.map(e, g())
    p.close()
    p.join()

The first thing you see is the "generator done" message, and peak memory use is unreasonably high (precisely because, as you suspect, the generator is run to exhaustion before any work is passed out).

However, replace the map() call like so:

r = list(p.imap(e, g()))

Now memory use remains small, and "generator done" appears at the output end.

However, you won't wait long enough to see that, because it's horridly slow :-( imap() not only treats that iterable as an iterable, but effectively passes only 1 item at a time across process boundaries. To get speed back too, this works:

r = list(p.imap(e, g(), chunksize=10000))

In real life, I'm much more likely to iterate over an imap() (or imap_unordered()) result than to force it into a list, and then memory use remains small for looping over the results too.

Metzger answered 22/6, 2017 at 20:33 Comment(2)
map needs the length of the sequence for its chunksize calculation. It seems like this wouldn't be needed if you set a chunksize yourself, but I haven't gone through all of the code to make sure. Since map is also building a result set of equal length to the input, its bookkeeping may need to change if prestaging the input iterable was removed.Backstage
By "isn't well-defined" I simply mean that none of this can be deduced from the docs - it's consequences of implementation details. So there's no guarantee that map() will always be eager, or that imap() will always be lazy. But that's how they in fact behave right now.Metzger
C
1

To build on the answer by Tim Peters, here is a jupyter notebook demonstrating the interplay between imap and chunksize:

https://gist.github.com/shadiakiki1986/273b3529d3ff7afe2f2cac7b5ac96fe2

It has 2 examples:

Example 1 uses chunksize=1 and has the following execution:

    On CPU 1, execute item 1 from generator
    On CPU 2, execute item 2 from generator
    When CPU 1 done with item 1, execute item 3 from generator
    When CPU 2 done with item 2, execute item 4 from generator
    etc

Example 2 has chunksize=3 with the following execution

    On CPU 1, execute items 1-3 from generator
    On CPU 2, execute items 4-6 from generator
    When CPU 1 done with items 1-3, execute on 7-9
    When CPU 2 done with items 4-6, execute on 10

Notice in example 2 that item 10 is executed on CPU 2 before items 8 and 9 on CPU 1.

Culpable answered 13/7, 2021 at 15:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.