How do I pass large numpy arrays between python subprocesses without saving to disk?
Asked Answered
K

6

29

Is there a good way to pass a large chunk of data between two python subprocesses without using the disk? Here's a cartoon example of what I'm hoping to accomplish:

import sys, subprocess, numpy

cmdString = """
import sys, numpy

done = False
while not done:
    cmd = raw_input()
    if cmd == 'done':
        done = True
    elif cmd == 'data':
        ##Fake data. In real life, get data from hardware.
        data = numpy.zeros(1000000, dtype=numpy.uint8)
        data.dump('data.pkl')
        sys.stdout.write('data.pkl' + '\\n')
        sys.stdout.flush()"""

proc = subprocess.Popen( #python vs. pythonw on Windows?
    [sys.executable, '-c %s'%cmdString],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)

for i in range(3):
    proc.stdin.write('data\n')
    print proc.stdout.readline().rstrip()
    a = numpy.load('data.pkl')
    print a.shape

proc.stdin.write('done\n')

This creates a subprocess which generates a numpy array and saves the array to disk. The parent process then loads the array from disk. It works!

The problem is, our hardware can generate data 10x faster than the disk can read/write. Is there a way to transfer data from one python process to another purely in-memory, maybe even without making a copy of the data? Can I do something like passing-by-reference?

My first attempt at transferring data purely in-memory is pretty lousy:

import sys, subprocess, numpy

cmdString = """
import sys, numpy

done = False
while not done:
    cmd = raw_input()
    if cmd == 'done':
        done = True
    elif cmd == 'data':
        ##Fake data. In real life, get data from hardware.
        data = numpy.zeros(1000000, dtype=numpy.uint8)
        ##Note that this is NFG if there's a '10' in the array:
        sys.stdout.write(data.tostring() + '\\n')
        sys.stdout.flush()"""

proc = subprocess.Popen( #python vs. pythonw on Windows?
    [sys.executable, '-c %s'%cmdString],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)

for i in range(3):
    proc.stdin.write('data\n')
    a = numpy.fromstring(proc.stdout.readline().rstrip(), dtype=numpy.uint8)
    print a.shape

proc.stdin.write('done\n')

This is extremely slow (much slower than saving to disk) and very, very fragile. There's got to be a better way!

I'm not married to the 'subprocess' module, as long as the data-taking process doesn't block the parent application. I briefly tried 'multiprocessing', but without success so far.

Background: We have a piece of hardware that generates up to ~2 GB/s of data in a series of ctypes buffers. The python code to handle these buffers has its hands full just dealing with the flood of information. I want to coordinate this flow of information with several other pieces of hardware running simultaneously in a 'master' program, without the subprocesses blocking each other. My current approach is to boil the data down a little bit in the subprocess before saving to disk, but it'd be nice to pass the full monty to the 'master' process.

Kevinkevina answered 17/2, 2011 at 19:47 Comment(7)
sounds like threading would suit you.Kayleigh
@Gabi Purcaru Because I'm ignorant about threading. Feel free to educate me with an answer!Kevinkevina
Avoid pickling numpy arrays. Use numpy.save(file, arr) instead. Pickling an array can use lots of intermediate memory (especially by default), and is rather slow. numpy.save is much more efficient.Ferrite
Andrew, do you know the total size of the data beforehand? Or a maximum size?Spirit
@Joe Kington: Good call. For ~200 MB arrays, numpy.save() gives a small time savings over numpy.dump(), (7.3 s -> 6.5 s), but it cuts memory use in half.Kevinkevina
@Sven Marnach: Good question. Ideally we'd acquire at top speed, forever, displaying processed data on screen and saving to disk until the disk filled up. Realistically I'm hitting tons of different bottlenecks, and fighting them one at a time. At the moment we can only acquire at ~GB/s for a fraction of a second before we have to pause to let processing catch up to acquisition.Kevinkevina
@Andrew: I'm asking to verify if the approach given in Joe's answer is feasible. I think you need to allocate shared memory before spawning child processes, so you would need to know how much shared memory to allocate. That's actually not the total amount of data, but rather the maximum amount of data to be transferred from the child to the master in one go.Spirit
P
30

While googling around for more information about the code Joe Kington posted, I found the numpy-sharedmem package. Judging from this numpy/multiprocessing tutorial it seems to share the same intellectual heritage (maybe largely the same authors? -- I'm not sure).

Using the sharedmem module, you can create a shared-memory numpy array (awesome!), and use it with multiprocessing like this:

import sharedmem as shm
import numpy as np
import multiprocessing as mp

def worker(q,arr):
    done = False
    while not done:
        cmd = q.get()
        if cmd == 'done':
            done = True
        elif cmd == 'data':
            ##Fake data. In real life, get data from hardware.
            rnd=np.random.randint(100)
            print('rnd={0}'.format(rnd))
            arr[:]=rnd
        q.task_done()

if __name__=='__main__':
    N=10
    arr=shm.zeros(N,dtype=np.uint8)
    q=mp.JoinableQueue()    
    proc = mp.Process(target=worker, args=[q,arr])
    proc.daemon=True
    proc.start()

    for i in range(3):
        q.put('data')
        # Wait for the computation to finish
        q.join()   
        print arr.shape
        print(arr)
    q.put('done')
    proc.join()

Running yields

rnd=53
(10,)
[53 53 53 53 53 53 53 53 53 53]
rnd=15
(10,)
[15 15 15 15 15 15 15 15 15 15]
rnd=87
(10,)
[87 87 87 87 87 87 87 87 87 87]
Paisley answered 18/2, 2011 at 1:26 Comment(1)
Sorry it took me so long to accept the answer. I still haven't had time to test it myself, I'll report back here when I do. Thanks again!Kevinkevina
F
10

Basically, you just want to share a block of memory between processes and view it as a numpy array, right?

In that case, have a look at this (Posted to numpy-discussion by Nadav Horesh awhile back, not my work). There are a couple of similar implementations (some more flexible), but they all essentially use this principle.

#    "Using Python, multiprocessing and NumPy/SciPy for parallel numerical computing"
# Modified and corrected by Nadav Horesh, Mar 2010
# No rights reserved


import numpy as N
import ctypes
import multiprocessing as MP

_ctypes_to_numpy = {
    ctypes.c_char   : N.dtype(N.uint8),
    ctypes.c_wchar  : N.dtype(N.int16),
    ctypes.c_byte   : N.dtype(N.int8),
    ctypes.c_ubyte  : N.dtype(N.uint8),
    ctypes.c_short  : N.dtype(N.int16),
    ctypes.c_ushort : N.dtype(N.uint16),
    ctypes.c_int    : N.dtype(N.int32),
    ctypes.c_uint   : N.dtype(N.uint32),
    ctypes.c_long   : N.dtype(N.int64),
    ctypes.c_ulong  : N.dtype(N.uint64),
    ctypes.c_float  : N.dtype(N.float32),
    ctypes.c_double : N.dtype(N.float64)}

_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(), _ctypes_to_numpy.keys()))


def shmem_as_ndarray(raw_array, shape=None ):

    address = raw_array._obj._wrapper.get_address()
    size = len(raw_array)
    if (shape is None) or (N.asarray(shape).prod() != size):
        shape = (size,)
    elif type(shape) is int:
        shape = (shape,)
    else:
        shape = tuple(shape)

    dtype = _ctypes_to_numpy[raw_array._obj._type_]
    class Dummy(object): pass
    d = Dummy()
    d.__array_interface__ = {
        'data' : (address, False),
        'typestr' : dtype.str,
        'descr' :   dtype.descr,
        'shape' : shape,
        'strides' : None,
        'version' : 3}
    return N.asarray(d)

def empty_shared_array(shape, dtype, lock=True):
    '''
    Generate an empty MP shared array given ndarray parameters
    '''

    if type(shape) is not int:
        shape = N.asarray(shape).prod()
    try:
        c_type = _numpy_to_ctypes[dtype]
    except KeyError:
        c_type = _numpy_to_ctypes[N.dtype(dtype)]
    return MP.Array(c_type, shape, lock=lock)

def emptylike_shared_array(ndarray, lock=True):
    'Generate a empty shared array with size and dtype of a  given array'
    return empty_shared_array(ndarray.size, ndarray.dtype, lock)
Ferrite answered 17/2, 2011 at 20:18 Comment(3)
I don't see how this can be used here. A multiprocessing.Array() would need to be created before spawning the subprocess, but in Andrew's code above the subprocess wants to create it. Am I missing something?Spirit
@Sven - You're right, the code won't work as-is. However, it shouldn't be too hard to tweak things to work (or at least, I think I can make it work without too much trouble). Give me a bit, and I'll see if I can cobble something a bit more complete together...Ferrite
This looks promising, looking forward to the cobbling.Kevinkevina
H
5

From the other answers, it seems that numpy-sharedmem is the way to go.

However, if you need a pure python solution, or installing extensions, cython or the like is a (big) hassle, you might want to use the following code which is a simplified version of Nadav's code:

import numpy, ctypes, multiprocessing

_ctypes_to_numpy = {
    ctypes.c_char   : numpy.dtype(numpy.uint8),
    ctypes.c_wchar  : numpy.dtype(numpy.int16),
    ctypes.c_byte   : numpy.dtype(numpy.int8),
    ctypes.c_ubyte  : numpy.dtype(numpy.uint8),
    ctypes.c_short  : numpy.dtype(numpy.int16),
    ctypes.c_ushort : numpy.dtype(numpy.uint16),
    ctypes.c_int    : numpy.dtype(numpy.int32),
    ctypes.c_uint   : numpy.dtype(numpy.uint32),
    ctypes.c_long   : numpy.dtype(numpy.int64),
    ctypes.c_ulong  : numpy.dtype(numpy.uint64),
    ctypes.c_float  : numpy.dtype(numpy.float32),
    ctypes.c_double : numpy.dtype(numpy.float64)}

_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(),
                            _ctypes_to_numpy.keys()))


def shm_as_ndarray(mp_array, shape = None):
    '''Given a multiprocessing.Array, returns an ndarray pointing to
    the same data.'''

    # support SynchronizedArray:
    if not hasattr(mp_array, '_type_'):
        mp_array = mp_array.get_obj()

    dtype = _ctypes_to_numpy[mp_array._type_]
    result = numpy.frombuffer(mp_array, dtype)

    if shape is not None:
        result = result.reshape(shape)

    return numpy.asarray(result)


def ndarray_to_shm(array, lock = False):
    '''Generate an 1D multiprocessing.Array containing the data from
    the passed ndarray.  The data will be *copied* into shared
    memory.'''

    array1d = array.ravel(order = 'A')

    try:
        c_type = _numpy_to_ctypes[array1d.dtype]
    except KeyError:
        c_type = _numpy_to_ctypes[numpy.dtype(array1d.dtype)]

    result = multiprocessing.Array(c_type, array1d.size, lock = lock)
    shm_as_ndarray(result)[:] = array1d
    return result

You would use it like this:

  1. Use sa = ndarray_to_shm(a) to convert the ndarray a into a shared multiprocessing.Array.
  2. Use multiprocessing.Process(target = somefunc, args = (sa, ) (and start, maybe join) to call somefunc in a separate process, passing the shared array.
  3. In somefunc, use a = shm_as_ndarray(sa) to get an ndarray pointing to the shared data. (Actually, you may want to do the same in the original process, immediately after creating sa, in order to have two ndarrays referencing the same data.)

AFAICS, you don't need to set lock to True, since shm_as_ndarray will not use the locking anyhow. If you need locking, you would set lock to True and call acquire/release on sa.

Also, if your array is not 1-dimensional, you might want to transfer the shape along with sa (e.g. use args = (sa, a.shape)).

This solution has the advantage that it does not need additional packages or extension modules, except multiprocessing (which is in the standard library).

Hauck answered 13/3, 2013 at 16:26 Comment(3)
I'm getting PicklingError: Can't pickle <class 'multiprocessing.sharedctypes.c_double_Array_<array size>'>: attribute lookup multiprocessing.sharedctypes.c_double_Array_<array size> failed. see my question here #16303854Leopoldine
I just saw your comment by chance; obviously, I need to check my notification settings. Is there anything I should change in my answer, which was misleading for you?Hauck
Well it was a long time ago :)Leopoldine
R
3

Use threads. But I guess you are going to get problems with the GIL.

Instead: Choose your poison.

I know from the MPI implementations I work with, that they use shared memory for on-node-communications. You will have to code your own synchronization in that case.

2 GB/s sounds like you will get problems with most "easy" methods, depending on your real-time constraints and available main memory.

Resistance answered 17/2, 2011 at 19:56 Comment(0)
B
2

One possibility to consider is to use a RAM drive for the temporary storage of files to be shared between processes. A RAM drive is where a portion of RAM is treated as a logical hard drive, to which files can be written/read as you would with a regular drive, but at RAM read/write speeds.

This article describes using the ImDisk software (for MS Win) to create such disk and obtains file read/write speeds of 6-10 Gigabytes/second: https://www.tekrevue.com/tip/create-10-gbs-ram-disk-windows/

An example in Ubuntu: https://askubuntu.com/questions/152868/how-do-i-make-a-ram-disk#152871

Another noted benefit is that files with arbitrary formats can be passed around with such method: e.g. Picke, JSON, XML, CSV, HDF5, etc...

Keep in mind that anything stored on the RAM disk is wiped on reboot.

Blues answered 2/1, 2018 at 17:37 Comment(0)
B
1

Use threads. You probably won't have problems with the GIL.

The GIL only affects Python code, not C/Fortran/Cython backed libraries. Most numpy operations and a good chunk of the C-backed Scientific Python stack release the GIL and can operate just fine on multiple cores. This blogpost discusses the GIL and scientific Python in more depth.

Edit

Simple ways to use threads include the threading module and multiprocessing.pool.ThreadPool.

Barilla answered 3/5, 2015 at 17:15 Comment(2)
This looks promising! Is your recommendation to use the 'dask' module, or is there an even simpler way to parallelize numpy? Would you be able to add a minimal code example of what you're thinking of?Kevinkevina
I've added a quick edit pointing people to threading and multiprocessing.pool.ThreadPool both of which have somewhat simple ways to executing functions asynchronously.Barilla

© 2022 - 2024 — McMap. All rights reserved.