multiprocessing in python - sharing large object (e.g. pandas dataframe) between multiple processes
Asked Answered
B

5

94

I am using Python multiprocessing, more precisely

from multiprocessing import Pool
p = Pool(15)

args = [(df, config1), (df, config2), ...] #list of args - df is the same object in each tuple
res = p.map_async(func, args) #func is some arbitrary function
p.close()
p.join()

This approach has a huge memory consumption; eating up pretty much all my RAM (at which point it gets extremely slow, hence making the multiprocessing pretty useless). I assume the problem is that df is a huge object (a large pandas dataframe) and it gets copied for each process. I have tried using multiprocessing.Value to share the dataframe without copying

shared_df = multiprocessing.Value(pandas.DataFrame, df)
args = [(shared_df, config1), (shared_df, config2), ...] 

(as suggested in Python multiprocessing shared memory), but that gives me TypeError: this type has no size (same as Sharing a complex object between Python processes?, to which I unfortunately don't understand the answer).

I am using multiprocessing for the first time and maybe my understanding is not (yet) good enough. Is multiprocessing.Value actually even the right thing to use in this case? I have seen other suggestions (e.g. queue) but am by now a bit confused. What options are there to share memory, and which one would be best in this case?

Bootless answered 18/3, 2014 at 17:56 Comment(2)
see recent related question: #22468779.Refutative
Is there a recent way to do this, or is using Namespace the best approach still? How did you end up solving it @BootlessCollapse
C
61

The first argument to Value is typecode_or_type. That is defined as:

typecode_or_type determines the type of the returned object: it is either a ctypes type or a one character typecode of the kind used by the array module. *args is passed on to the constructor for the type.

Emphasis mine. So, you simply cannot put a pandas dataframe in a Value, it has to be a ctypes type.

You could instead use a multiprocessing.Manager to serve your singleton dataframe instance to all of your processes. There's a few different ways to end up in the same place - probably the easiest is to just plop your dataframe into the manager's Namespace.

from multiprocessing import Manager

mgr = Manager()
ns = mgr.Namespace()
ns.df = my_dataframe

# now just give your processes access to ns, i.e. most simply
# p = Process(target=worker, args=(ns, work_unit))

Now your dataframe instance is accessible to any process that gets passed a reference to the Manager. Or just pass a reference to the Namespace, it's cleaner.

One thing I didn't/won't cover is events and signaling - if your processes need to wait for others to finish executing, you'll need to add that in. Here is a page with some Event examples which also cover with a bit more detail how to use the manager's Namespace.

(note that none of this addresses whether multiprocessing is going to result in tangible performance benefits, this is just giving you the tools to explore that question)

Cryptozoic answered 18/3, 2014 at 18:24 Comment(10)
Thanks, this has made the memory consumption a lot better. It is still much higher than I would have thought though - how do I find out where the memory consumption is coming from?Bootless
@Anne, this Namespace approach causes a lot of memory consumption for me, too. I've tried this with a DF with millions of rows and 6 columns (taking up 2 GB of RAM), and the workers end up with about that much usage, too. What's more, access to the data that is fast (< 1ms) when profiled in non-multiprocessing becomes really slow for the worker in multiprocessing context. Even after the mem usage swells in worker, a single ns.df.loc[ix] call can take several seconds. @Cryptozoic and @Jeff, do you have any ideas about this?Benedick
Trying this approach I can read the df from shared memory, but I was not able to change its values.Unbodied
I tried this approach with a big df (loaded from a ~9Gb csv), and I got a formating error which I'm guessing is because of the size of the df, "struct.error: 'i' format requires -2147483648 <= number <= 2147483647", any suggestions? Is there a different technique available?Congeries
I found that for big datastructures, big dictionaries or pandas dataframes, is better to implement this approach #48465065Congeries
Is it possible to use namespace together with Pool and Pool.map? I'm curious because map only take one argument, say this argument is a list of tuple. Do we need to propagate namespace to all the tuples in this list?Petiolate
Say if we created two "Processes", or a "Pool" of processes, does whatever inside "namespace" gets copied over to different sub-processes? I'm very confused and very interested to see if using the multiprocessing.Manager can avoid huge memory consumption.Petiolate
You are getting slow downs and high memory usage because this approach stores the shared object to temporary memory on hard disk. Then each process loads that in RAM and thats also why you get higher RAM usage. The only way to achieve in memory shared memory is via multiprocessing.shared_memory since Python 3.8 and later.Reduplicative
This is not a solution for sharing a dataframe between processes. Related topic: #72799054Apocrypha
@Cryptozoic The original question was about a Pool, but your solution works only with Process. Is there a way to use your namespace based approach with a Pool()?Palliate
S
14

You can use Array instead of Value for storing your dataframe.

The solution below converts a pandas dataframe to an object that stores its data in shared memory:

import numpy as np
import pandas as pd
import multiprocessing as mp
import ctypes

# the origingal dataframe is df, store the columns/dtypes pairs
df_dtypes_dict = dict(list(zip(df.columns, df.dtypes)))

# declare a shared Array with data from df
mparr = mp.Array(ctypes.c_double, df.values.reshape(-1))

# create a new df based on the shared array
df_shared = pd.DataFrame(np.frombuffer(mparr.get_obj()).reshape(df.shape),
                         columns=df.columns).astype(df_dtypes_dict)

If now you share df_shared across processes, no additional copies will be made. For you case:

pool = mp.Pool(15)

def fun(config):
    # df_shared is global to the script
    df_shared.apply(config)  # whatever compute you do with df/config

config_list = [config1, config2]
res = p.map_async(fun, config_list)
p.close()
p.join()

This is also particularly useful if you use pandarallel, for example:

# this will not explode in memory
from pandarallel import pandarallel
pandarallel.initialize()
df_shared.parallel_apply(your_fun, axis=1)

Note: with this solution you end up with two dataframes (df and df_shared), which consume twice the memory and are long to initialise. It might be possible to read the data directly in shared memory.

Sparoid answered 12/6, 2020 at 9:40 Comment(2)
This seems to be the only working approach of sharing a pandas dataframe without copying to EACH subprocess, while being able to use multicore computing. Other stuff like namespaces and managers in MP still creates copies. Only with Array, the main process mem usage goes from 7x (while sharing) to 3x (after sharing, while running) of the original df's (at least on Win), and recreation of the df takes significant time. Is there any way to further optimize/speed this up?Basicity
I have a large dataframe(20GB+), and it's extremely slow to create df_sharedZeculon
D
6

You can share a pandas dataframe between processes without any memory overhead by creating a data_handler child process. This process receives calls from the other children with specific data requests (i.e. a row, a specific cell, a slice etc..) from your very large dataframe object. Only the data_handler process keeps your dataframe in memory unlike a Manager like Namespace which causes the dataframe to be copied to all child processes. See below for a working example. This can be converted to pool.

Need a progress bar for this? see my answer here: https://mcmap.net/q/150000/-show-the-progress-of-a-python-multiprocessing-pool-imap_unordered-call

import time
import Queue
import numpy as np
import pandas as pd
import multiprocessing
from random import randint

#==========================================================
# DATA HANDLER
#==========================================================

def data_handler( queue_c, queue_r, queue_d, n_processes ):

    # Create a big dataframe
    big_df = pd.DataFrame(np.random.randint(
        0,100,size=(100, 4)), columns=list('ABCD'))

    # Handle data requests
    finished = 0
    while finished < n_processes:

        try:
            # Get the index we sent in
            idx = queue_c.get(False)

        except Queue.Empty:
            continue
        else:
            if idx == 'finished':
                finished += 1
            else:
                try:
                    # Use the big_df here!
                    B_data = big_df.loc[ idx, 'B' ]

                    # Send back some data
                    queue_r.put(B_data)
                except:
                    pass    

# big_df may need to be deleted at the end. 
#import gc; del big_df; gc.collect()

#==========================================================
# PROCESS DATA
#==========================================================

def process_data( queue_c, queue_r, queue_d):

    data = []

    # Save computer memory with a generator
    generator = ( randint(0,x) for x in range(100) )

    for g in generator:

        """
        Lets make a request by sending
        in the index of the data we want. 
        Keep in mind you may receive another 
        child processes return call, which is
        fine if order isnt important.
        """

        #print(g)

        # Send an index value
        queue_c.put(g)

        # Handle the return call
        while True:
            try:
                return_call = queue_r.get(False)
            except Queue.Empty:
                continue
            else:
                data.append(return_call)
                break

    queue_c.put('finished')
    queue_d.put(data)   

#==========================================================
# START MULTIPROCESSING
#==========================================================

def multiprocess( n_processes ):

    combined  = []
    processes = []

    # Create queues
    queue_data = multiprocessing.Queue()
    queue_call = multiprocessing.Queue()
    queue_receive = multiprocessing.Queue()

    for process in range(n_processes): 

        if process == 0:

                # Load your data_handler once here
                p = multiprocessing.Process(target = data_handler,
                args=(queue_call, queue_receive, queue_data, n_processes))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_call, queue_receive, queue_data))
        processes.append(p)
        p.start()

    for i in range(n_processes):
        data_list = queue_data.get()    
        combined += data_list

    for p in processes:
        p.join()    

    # Your B values
    print(combined)


if __name__ == "__main__":

    multiprocess( n_processes = 4 )
Decortication answered 13/4, 2019 at 19:42 Comment(1)
Hey, thanks for sharing this. It seems to be working. A small query on this, doesn't the Queue in the data_handler process make the process_handlers run one at a time? Till the result is computed the process_handler just waits right? Aren't we losing the main purpose of parallelizing here? Please do correct me if I am wrong here. Thanks!Varve
B
6

At least Python 3.6 supports to store a pandas DataFrame as a multiprocessing.Value. See below a working example:

import ctypes
import pandas as pd
from multiprocessing import Value

df = pd.DataFrame({'a': range(0,9),
                   'b': range(10,19),
                   'c': range(100,109)})

k = Value(ctypes.py_object)
k.value = df

print(k.value)
Bandaid answered 24/9, 2021 at 22:11 Comment(2)
Even making it, my RAM consuption did grow upTropology
I gave this a try. It works until I pass the Value object as an argument to Process, then it always crashes if I try and pull out the value in the child process. This code would be better with a full example.Blaze
B
1

I was pretty surprised that joblib's Parallel (since 1.0.1 at least) supports sharing pandas dataframes with multiprocess workers out of the box already. At least with the 'loky' backend. One thing I figured out experimentally: parameters you pass to the function should not contain any large dict. If they do, turn the dict into a Series or Dataframe. Some additional memory for sure gets used by each worker, but much less than the size of your supposedly 'big' dataframe residing in the main process. And the computation begins right away in all workers. Otherwise, joblib starts all your requested workers, but they are hanging idle while objects are copied into each one sequentially, which is taking a long time. I can provide a code sample if someone needs it. I have tested dataframes processing only in read-only mode. The feature is not mentioned in the docs but it works for Pandas.

Basicity answered 23/12, 2021 at 5:30 Comment(2)
A code example would be great! I never got loky to work with my problem.Unceremonious
The code sample is here: github.com/joblib/joblib/issues/1244Basicity

© 2022 - 2024 — McMap. All rights reserved.