Fastest way to write numpy array in arrow format
Asked Answered
N

2

11

I'm looking for fast ways to store and retrieve numpy array using pyarrow. I'm pretty satisfied with retrieval. It takes less than 1 second to extract columns from my .arrow file that contains 1.000.000.000 integers of dtype = np.uint16.

import pyarrow as pa
import numpy as np

def write(arr, name):
    arrays = [pa.array(col) for col in arr]
    names = [str(i) for i in range(len(arrays))]
    batch = pa.RecordBatch.from_arrays(arrays, names=names)
    with pa.OSFile(name, 'wb') as sink:
        with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
            writer.write_batch(batch)

def read(name):
    source = pa.memory_map(name, 'r')
    table = pa.ipc.RecordBatchStreamReader(source).read_all()
    for i in range(table.num_columns):
        yield table.column(str(i)).to_numpy()

arr = np.random.randint(65535, size=(250, 4000000), dtype=np.uint16)

%%timeit -r 1 -n 1
write(arr, 'test.arrow')
>>> 25.6 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

%%timeit -r 1 -n 1
for n in read('test.arrow'): n
>>> 901 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

Can efficiency of writing to .arrow format be improved? In addition, I tested np.save:

%%timeit -r 1 -n 1
np.save('test.npy', arr)
>>> 18.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

It looks a little bit faster. Can we optimise Apache Arrow for better writing into .arrow format further?

Nunatak answered 9/11, 2021 at 16:44 Comment(3)
If you are running into IO bottlenecks using LZ4 compression as a write option might be worthwile. It would slow down reading though because data is no longer zero copy. if you do have multiple columns Arrow should compress them in parallel so the difference might not be too bad.Gunn
Does np.random.randint() return a generator or similarly lazy structure? Might you be timing the random number generation as well as the writes? (When I write to parquet from pandas, it's much faster than that, even the HDD.)Charissa
@Charissa No, it's not lazy. I'm timing just write and read as it is shown in my code. I really wonder what is wrong with my testing? I've done it on Google Colab too and I've got 16s for writing, 24ms for reading. I'm going to try different methods as well.Nunatak
T
9

It may be the case that the performance issue is mainly due to IO/disk speed. In this case, there isn't much you can improve.

I ran a few tests on my device. The numbers I get are different from yours. But the bottom line is the same, writing is slower than reading.

The resulting file is 1.9 GB (2000023184 bytes):

$ ls test.arrow -l
-rw-rw-r-- 1 0x26res 0x26res 2000023184 Nov 15 10:01 test.arrow

In the code below I generate 1.9 GB of random bytes, and save them, then compare to the time it took to save with arrow:

import secrets

data = b"\x00" + secrets.token_bytes(2000023184)  + b"\x00"

def write_bytes(data, name):
    with open(name, 'wb') as fp:
        fp.write(data)

%%timeit -r 1 -n 1 write_bytes(data, 'test.bytes')
>>> 2.29 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

%%timeit -r 1 -n 1 write(arr, 'test.arrow')
>>> 2.52 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

On my device, it takes 2.52 seconds to write the data using arrow. If I try to write that amount of random bytes it takes 2.29 seconds. It means the overhead or arrow is about 10% of the write time, so there isn't much that can be done to speed it up.

Thermolabile answered 15/11, 2021 at 10:14 Comment(1)
You're right. That's indeed some kind of issue of my RAM/IO/disk speed. 2.5 seconds are really good. Hopefully, there are some workarounds in pyarrow.Nunatak
N
1

Indeed, it appears to be some kind of limitations of my RAM/IO/disk. Very silent ones... It slows down my writing 3 - 8 times after arr exceeds 200M items and that's why I'm experiencing a drop of speed from 2.5 seconds to 20. I would be glad to know if this could be resolved in pyarrow.

def pyarrow_write_arrow_Batch(arr, name):
    arrays = [pa.array(col) for col in arr]
    names = [str(i) for i in range(len(arrays))]
    batch = pa.RecordBatch.from_arrays(arrays, names=names)
    with pa.OSFile(name, 'wb') as sink:
        with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
            writer.write_batch(batch)

%matplotlib notebook
import benchit
benchit.setparams(environ='notebook')
benchit.setparams(rep=5)

arr = np.random.randint(65535, size=(int(1e9),), dtype=np.uint16)
size = [4, 8, 12, 20, 32, 48, 64, 100, 160, 256, 400, 600, 1000]

def pwa_Batch_10000(arr, name): return pyarrow_write_arrow_Batch(arr.reshape(-1, 10000), name)
def pwa_Batch_100000(arr, name): return pyarrow_write_arrow_Batch(arr.reshape(-1, 100000), name)
def pwa_Batch_1000000(arr, name): return pyarrow_write_arrow_Batch(arr.reshape(-1, 1000000), name)
def pwa_Batch_4000000(arr, name): return pyarrow_write_arrow_Batch(arr.reshape(-1, 4000000), name)

fns = [pwa_Batch_10000, pwa_Batch_100000, pwa_Batch_1000000, pwa_Batch_4000000]
in_ = {s: (arr[:s*int(1e6)], 'test.arrow') for s in size}
t = benchit.timings(fns, in_, multivar=True, input_name='Millions of items')
t.plot(logx=True, figsize=(8,4), fontsize=10)

enter image description here

Nunatak answered 21/11, 2021 at 2:44 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.