How to use Apache Arrow IPC from multiple processes (possibly from different languages)?
Asked Answered
S

1

8

I'm not sure where to begin, so looking for some guidance. I'm looking for a way to create some arrays/tables in one process, and have it accessible (read-only) from another.

So I create a pyarrow.Table like this:

a1 = pa.array(list(range(3)))
a2 = pa.array(["foo", "bar", "baz"])

a1
# <pyarrow.lib.Int64Array object at 0x7fd7c4510100>
# [
#   0,
#   1,
#   2
# ]

a2
# <pyarrow.lib.StringArray object at 0x7fd7c5d6fa00>
# [
#   "foo",
#   "bar",
#   "baz"
# ]

tbl = pa.Table.from_arrays([a1, a2], names=["num", "name"])

tbl
# pyarrow.Table
# num: int64
# name: string
# ----
# num: [[0,1,2]]
# name: [["foo","bar","baz"]]

Now how do I read this from a different process? I thought I would use multiprocessing.shared_memory.SharedMemory, but that didn't quite work:

shm = shared_memory.SharedMemory(name='pa_test', create=True, size=tbl.nbytes)
with pa.ipc.new_stream(shm.buf, tbl.schema) as out:
    for batch in tbl.to_batches():
        out.write(batch)

# TypeError: Unable to read from object of type: <class 'memoryview'>

Do I need to wrap the shm.buf with something?

Even if I get this to work, it seems very fiddly. How would I do this in a robust manner? Do I need something like zmq?

I'm not clear how this is zero copy though. When I write the record batches, isn't that serialisation? What am I missing?

In my real use case, I also want to talk to Julia, but maybe that should be a separate question when I come to it.

PS: I have gone through the docs, it didn't clarify this part for me.

Silverplate answered 8/2, 2023 at 23:34 Comment(0)
C
10

Do I need to wrap the shm.buf with something?

Yes, you can use pa.py_buffer() to wrap it:

size = calculate_ipc_size(table)
shm = shared_memory.SharedMemory(create=True, name=name, size=size)

stream = pa.FixedSizeBufferWriter(pa.py_buffer(shm.buf))
with pa.RecordBatchStreamWriter(stream, table.schema) as writer:
   writer.write_table(table)

Also, for size you need to calculate the size of the IPC output, which may be a bit larger than Table.nbytes. The function you can use for that is:

def calculate_ipc_size(table: pa.Table) -> int:
    sink = pa.MockOutputStream()
    with pa.ipc.new_stream(sink, table.schema) as writer:
        writer.write_table(table)
    return sink.size()

How would I do this in a robust manner?

Not sure of this part yet. In my experience the original process needs to stay alive while the others are reusing the buffers, but there might be a way to get around that. This is likely connected to this bug in CPython: https://bugs.python.org/issue38119

I'm not clear how this is zero copy though. When I write the record batches, isn't that serialisation? What am I missing?

You are correct that writing the Arrow data into an IPC buffer does involve copies. The zero-copy part is when other processes read the data from shared memory. The columns of the Arrow table will reference the relevant segments of the IPC buffer, rather than a copy.

Conceptionconceptual answered 9/2, 2023 at 17:58 Comment(3)
Thanks for the thorough answer! And for the link to the Python issue :) I'm wondering if using your implementation of calculate_ipc_size also serialises the data? That would mean I end up serialising twice. I intend to share a large-ish table, it might mean a noticeable delay.Silverplate
You do serialize twice--sort of. But since calculate_ipc_size is writing to a MockOutputStream that first time is very fast. No bytes are actually written, it's just counting the bytes. On my machine I am able to write about 10 gigabytes per second into shared memory.Conceptionconceptual
Ah cool! thanks a lot for the testing :) That's plenty fast for my use case :)Silverplate

© 2022 - 2025 — McMap. All rights reserved.