Python multiprocess.Pool.map can't handle large arrays.
Asked Answered
A

1

6

This is the code I use to parrellize an apply function on the lines of a pandas.DataFrame object:

from multiprocessing import cpu_count, Pool
from functools import partial

def parallel_applymap_df(df: DataFrame, func, num_cores=cpu_count(),**kargs):

partitions = np.linspace(0, len(df), num_cores + 1, dtype=np.int64)
df_split = [df.iloc[partitions[i]:partitions[i + 1]] for i in range(num_cores)]
pool = Pool(num_cores)
series = pd.concat(pool.map(partial(apply_wrapper, func=func, **kargs), df_split))
pool.close()
pool.join()

return series

It's works with a subsamples of 200 000 rows, but when i try on the full 200 000 000 examples, i get the following error message:

~/anaconda3/lib/python3.6/site-packages/multiprocess/connection.py in _send_bytes(self, buf)
394         n = len(buf)
395         # For wire compatibility with 3.2 and lower
—> 396         header = struct.pack("!i", n)
397         if n > 16384:
398             # The payload is large so Nagle's algorithm won't be triggered

error: 'i' format requires -2147483648 <= number <= 2147483647

Generated by the line:

series = pd.concat(pool.map(partial(apply_wrapper, func=func, **kargs), df_split))

It's very weird because a slightly different version that I use to parallelize operations that are not vectorized in pandas (like Series.dt.time) works on the same number of rows. That's version for exampes works:

def parallel_map_df(df: DataFrame, func, num_cores=cpu_count()):

partitions = np.linspace(0, len(df), num_cores + 1, dtype=np.int64)
df_split = [df.iloc[partitions[i]:partitions[i + 1]] for i in range(num_cores)]
pool = Pool(num_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()

return df
Alatea answered 24/4, 2018 at 13:15 Comment(1)
I encountered a very similar issue. Have you found a solution/explanation for this?Goodfellowship
H
0

The error itself comes from the fact that multiprocessing makes a connection between the different workers in the pool. To send data to and from this worker, data has to be send in bytes. The first step is to create a header for the message that will be send to the worker. This header contains the length of the buffer as an integer. However, if the length of the buffer is larger than what can be represented with an integer, the code fails and produces the error you show.

We're missing the data and quite a bit of code that is needed to reproduce your problem, so instead I'll provide a minimal working example:

import numpy
import pandas
import random

from typing import List
from multiprocessing import cpu_count, Pool


def parallel_applymap_df(
    input_dataframe: pandas.DataFrame, func, num_cores: int = cpu_count(), **kwargs
) -> pandas.DataFrame:

    # Create splits in the dataframe of equal size (one split will be processed by one core)
    partitions = numpy.linspace(
        0, len(input_dataframe), num_cores + 1, dtype=numpy.int64
    )
    splits = [
        input_dataframe.iloc[partitions[i] : partitions[i + 1]]
        for i in range(num_cores)
    ]

    # Just for debugging, add metadata to each split
    for index, split in enumerate(splits):
        split.attrs["split_index"] = index

    # Create a pool of workers
    with Pool(num_cores) as pool:

        # Map the splits in the dataframe to workers in the pool
        result: List[pandas.DataFrame] = pool.map(func, splits, **kwargs)

    # Combine all results of the workers into a new dataframe
    return pandas.concat(result)


if __name__ == "__main__":

    # Create some test data
    df = pandas.DataFrame([{"A": random.randint(0, 100)} for _ in range(200000000)])

    def worker(df: pandas.DataFrame) -> pandas.DataFrame:

        # Print the length of the dataframe being processed (for debugging)
        print("Working on split #", df.attrs["split_index"], "Length:", len(df))

        # Do some arbitrary stuff to the split of the dataframe
        df["B"] = df.apply(lambda row: f"test_{row['A']}", axis=1)

        # Return the result
        return df

    # Create a new dataframe by applying the worker function to the dataframe in parallel
    df = parallel_applymap_df(df, worker)
    print(df)

Please note that this is probably not the fastest way to do this. For faster alternatives have a look at swifter or dask.

Heptode answered 7/1, 2021 at 13:40 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.