Multiprocessing of a function on a pandas dataframe
Asked Answered
B

1

5

I have a large pandas dataframe with multiple "records" consisting of 2 or more line items. I'm trying to efficiently perform a CPU intensive calculation on each record using multiprocessing. Here's a simplified example with a function that just adds a random number to each record:

import pandas as pd
from random import randrange
from multiprocessing import Pool

#Trivial example function 
def my_func(record): 
    df.loc[((df.Record == record), 'Result')] = randrange(0,100)
    print (df)

d = {'Record': ['A', 'A', 'B', 'B'], 'Values': [100, 200, 50, 70]}
df = pd.DataFrame(d)
all_records = df['Record'].unique()

if __name__ == '__main__':
    pool = Pool(processes=2)
    pool.map(my_func,all_records)
    df.to_csv('output.csv')

The desired output is the original dataframe with a new column titled "Result" that includes a random number for each record. For example:

    Record  Values  Result
0      A     100    63.0
1      A     200    63.0
2      B      50    22.0
3      B      70    22.0

Actual results are my CSV output isn't updated with a result column. I can tell the processes are working through the print statement in the function. From what I've researched, the processes act on a copy of df and aren't being brought back together. How can I get the results of each process reflected in a single dataframe?

Baculiform answered 24/12, 2017 at 3:58 Comment(0)
W
8

This might work for you:

import pandas as pd
from random import randrange
from multiprocessing import Pool

#Trivial example function 
def my_func(record):
    sub_df = df.loc[df['Record'] == record]
    sub_df['Result'] = randrange(0,100)
    # return results for the record as pd.Series
    return sub_df['Result']  

d = {'Record': ['A', 'A', 'B', 'B'], 'Values': [100, 200, 50, 70]}
df = pd.DataFrame(d)
all_records = df['Record'].unique()

if __name__ == '__main__':
    pool = Pool(processes=2)
    results = pool.map(my_func, all_records)
    pool.close()
    pool.join()

    # concatenate results into a single pd.Series
    results = pd.concat(results)

    # join results with original df
    joined_df = df.join(results)

    print(joined_df)
    #       Record  Values  Result
    # 0      A     100      90
    # 1      A     200      90
    # 2      B      50      62
    # 3      B      70      62
Wheelbase answered 24/12, 2017 at 11:57 Comment(4)
What about multiple arguments e.g my_func(record, arg1=arg1)?Hanse
@Hanse You can use functools.partial to accomplish this: results = pool.map(partial(my_func, arg1=100), all_records)Wheelbase
there is an issue with memory leaking for large df when you try to mp it with pool.mapBusybody
I'm afraid a copy of df is created for every child process because d = .. is not inside if __name__ == '__main__':Kilian

© 2022 - 2025 — McMap. All rights reserved.