Shared-memory objects in multiprocessing
Asked Answered
O

4

161

Suppose I have a large in memory numpy array, I have a function func that takes in this giant array as input (together with some other parameters). func with different parameters can be run in parallel. For example:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

If I use multiprocessing library, then that giant array will be copied for multiple times into different processes.

Is there a way to let different processes share the same array? This array object is read-only and will never be modified.

What's more complicated, if arr is not an array, but an arbitrary python object, is there a way to share it?

[EDITED]

I read the answer but I am still a bit confused. Since fork() is copy-on-write, we should not invoke any additional cost when spawning new processes in python multiprocessing library. But the following code suggests there is a huge overhead:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

output (and by the way, the cost increases as the size of the array increases, so I suspect there is still overhead related to memory copying):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Why is there such huge overhead, if we didn't copy the array? And what part does the shared memory save me?

Osrock answered 23/5, 2012 at 14:20 Comment(5)
possible duplicate of Is shared readonly data copied to different processes for Python multiprocessing?Wilburn
You have looked at the docs, right?Marcin
@FrancisAvila is there a way to share not just array, but arbitrary python objects?Osrock
@LevLevitsky I have to ask, is there a way to share not just array, but arbitrary python objects?Osrock
This answer explains nicely why arbitrary Python objects can't be shared.Meemeece
W
149

If you use an operating system that uses copy-on-write fork() semantics (like any common unix), then as long as you never alter your data structure it will be available to all child processes without taking up additional memory. You will not have to do anything special (except make absolutely sure you don't alter the object).

The most efficient thing you can do for your problem would be to pack your array into an efficient array structure (using numpy or array), place that in shared memory, wrap it with multiprocessing.Array, and pass that to your functions. This answer shows how to do that.

If you want a writeable shared object, then you will need to wrap it with some kind of synchronization or locking. multiprocessing provides two methods of doing this: one using shared memory (suitable for simple values, arrays, or ctypes) or a Manager proxy, where one process holds the memory and a manager arbitrates access to it from other processes (even over a network).

The Manager approach can be used with arbitrary Python objects, but will be slower than the equivalent using shared memory because the objects need to be serialized/deserialized and sent between processes.

There are a wealth of parallel processing libraries and approaches available in Python. multiprocessing is an excellent and well rounded library, but if you have special needs perhaps one of the other approaches may be better.

Wilburn answered 23/5, 2012 at 16:42 Comment(9)
Thanks for your reply, Francis. I am still a bit confused so I added another question to my original post. If you can look at it, that would be great.Osrock
Multiprocessing communicates with subprocesses using pickling and message passing, so by including your array as an argument to apply_async it is getting serialized and copied. To avoid copying you would need a reference to the object made before calling apply_async (and do not include arr in the argument list), or wrap in multiprocessing.Array and pass that.Wilburn
Just to note, on Python fork() actually means copy on access (because just accessing the object will change its ref-count).Lisabethlisan
@FabioZadrozny Would it actually copy the entire object, or just the memory page containing its refcount?Roundtree
AFAIK, only the memory page containing the refcount (so, 4kb on each object access).Lisabethlisan
@FrancisAvila "To avoid copying you would need a reference to the object made before calling apply_async (and do not include arr in the argument list)": how would you do that?Nel
@Nel Use a closure. The function given to apply_async should reference the shared object in scope directly instead of through its arguments.Wilburn
@FrancisAvila how do you use a closure? Shouldn't the function that you give to apply_async be pickable? Or this is only a map_async restriction?Insole
If you want to avoid the copy-on-access problem, use pypy. Since it's not refcounted, it doesn't suffer from the same refcounting issues.Flagg
M
18

This is the intended use case for Ray, which is a library for parallel and distributed Python. Under the hood, it serializes objects using the Apache Arrow data layout (which is a zero-copy format) and stores them in a shared-memory object store so they can be accessed by multiple processes without creating copies.

The code would look like the following.

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

If you don't call ray.put then the array will still be stored in shared memory, but that will be done once per invocation of func, which is not what you want.

Note that this will work not only for arrays but also for objects that contain arrays, e.g., dictionaries mapping ints to arrays as below.

You can compare the performance of serialization in Ray versus pickle by running the following in IPython.

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

Serialization with Ray is only slightly faster than pickle, but deserialization is 1000x faster because of the use of shared memory (this number will of course depend on the object).

See the Ray documentation. You can read more about fast serialization using Ray and Arrow. Note I'm one of the Ray developers.

Mcrae answered 6/2, 2019 at 21:44 Comment(5)
Ray sounds good! But, I've tried using this library before, but unfortunately, I just realized that Ray doesn't support windows. I hope you guys can support windows ASAP. Thank you, developers!Sperm
Ray supports windows and its stable, vc++ runtime must be installed but that is easy, comes with visual studio out of the box.Extractive
Ray currently supports MacOS and Linux. Windows wheels are now available, but Windows support is experimental and under development. - Latest from Ray's website docs.ray.io/en/latest/installation.htmlDelamination
Ray supports Windows now :)Mcrae
@RobertNishihara I tried loading a list of dictionaries in shared-memory but turns out zero-copy only works for numeric numpy arrays. Is there any way I could use Ray to share this dataset between processes without copying it? I did it with multiprocessing Manager but the reading is way too slow.Dall
P
17

I run into the same problem and wrote a little shared-memory utility class to work around it.

I'm using multiprocessing.RawArray (lockfree), and also the access to the arrays is not synchronized at all (lockfree), be careful not to shoot your own feet.

With the solution I get speedups by a factor of approx 3 on a quad-core i7.

Here's the code: Feel free to use and improve it, and please report back any bugs.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))
Pub answered 15/5, 2013 at 10:55 Comment(3)
Just realized that you have to set up your shared memory arrays before you create the multiprocessing Pool, don't know why yet but it definitly won't work the other way round.Pub
the reason why is that Multiprocessing pool calls fork() when the Pool is instantiated, so anything after that won't get access to the pointer to any shared mem created afterwards.Delindadelineate
When I tried this code under py35 I got exception in multiprocessing.sharedctypes.py, so I guess this code is for py2 only.Cremona
S
7

Like Robert Nishihara mentioned, Apache Arrow makes this easy, specifically with the Plasma in-memory object store, which is what Ray is built on.

I made brain-plasma specifically for this reason - fast loading and reloading of big objects in a Flask app. It's a shared-memory object namespace for Apache Arrow-serializable objects, including pickle'd bytestrings generated by pickle.dumps(...).

The key difference with Apache Ray and Plasma is that it keeps track of object IDs for you. Any processes or threads or programs that are running on locally can share the variables' values by calling the name from any Brain object.

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/')

brain['a'] = [1]*10000

brain['a']
# >>> [1,1,1,1,...]
Synchronous answered 22/5, 2019 at 20:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.