I am using pyarrow
and am struggling to understand the big difference in memory usage between the Dataset.to_batches
method compared to ParquetFile.iter_batches
.
Using pyarrow.dataset
>>> import pyarrow as pa
>>> import pyarrow.dataset as ds
>>>
>>> filelist = [
... 'local_data/file1.parquet'
... ]
>>> input_dataset = ds.dataset(source=filelist)
>>> sink = pa.BufferOutputStream()
>>> writer = pa.ipc.new_stream(sink, input_dataset.schema)
>>> ds_batches = input_dataset.to_batches(batch_size=500, batch_readahead=0, fragment_readahead=0)
>>> for _ in range(4):
... writer.write_batch(next(ds_batches))
...
>>> buf = sink.getvalue()
>>> buf.size
5637392
Looking at memory consumption in task manager, before the line ds_batches = input_dataset.to_batches(batch_size=500, batch_readahead=0, fragment_readahead=0)
it is around 44MB, once I execute that line it jumps to ~815MB. This is surprising because I thought to_batches
returns an iterator and that memory should start to be consumed only after I start iteration. I did notice that memory remains stable even as I am iterating over batches.
Using pyarrow.parquet.ParquetFile
>>> import pyarrow as pa
>>> import pyarrow.parquet as pq
>>>
>>> filelist = [
... 'local_data/file1.parquet'
... ]
>>> pqfile = pq.ParquetFile(filelist[0])
>>> sink = pa.BufferOutputStream()
>>> writer = pa.ipc.new_stream(sink, pqfile.schema_arrow)
>>> pqiterator = pq.ParquetFile(filelist[0]).iter_batches(batch_size=500)
>>> for _ in range(4):
... writer.write_batch(next(pqiterator))
...
>>> buf = sink.getvalue()
>>> buf.size
5636632
Behavior here is more in line with what I expected. Memory consumption does not change once I execute pqiterator = pq.ParquetFile(filelist[0]).iter_batches(batch_size=500)
and remains at a very low 19MB.
Note that at the end of both scenarios, the size of the buffer is around 5MB in both cases. And if I read back out of that buffer I get the same (and expected) number of records.
- Question 1: what explains the high memory consumption of
Dataset.to_batches
? - Question 2: why is there such a difference between
Dataset.to_batches
andParquetFile.iter_batches
, both of which basically allow me to iterate over batches of records?
Notes:
- The documentation on
to_batches
says
Read the dataset as materialized record batches.
I wondered if "materialized" actually meant it was returning all batches, but that doesn't seem right either as all batches in the file would take up a lot more space.
- I see the same results if I use
Dataset.scanner()
instead ofDataset.to_batches
, even though the documentation forscanner
says
Data is not loaded immediately. Instead, this produces a Scanner, which exposes further operations