pyarrow memory consumption difference between Dataset.to_batches and ParquetFile.iter_batches
Asked Answered
T

0

10

I am using pyarrow and am struggling to understand the big difference in memory usage between the Dataset.to_batches method compared to ParquetFile.iter_batches.

Using pyarrow.dataset

>>> import pyarrow as pa
>>> import pyarrow.dataset as ds
>>>
>>> filelist = [
...     'local_data/file1.parquet'
... ]
>>> input_dataset = ds.dataset(source=filelist)
>>> sink = pa.BufferOutputStream()
>>> writer = pa.ipc.new_stream(sink, input_dataset.schema)
>>> ds_batches = input_dataset.to_batches(batch_size=500, batch_readahead=0, fragment_readahead=0)
>>> for _ in range(4):
...     writer.write_batch(next(ds_batches))
...
>>> buf = sink.getvalue()
>>> buf.size
5637392

Looking at memory consumption in task manager, before the line ds_batches = input_dataset.to_batches(batch_size=500, batch_readahead=0, fragment_readahead=0) it is around 44MB, once I execute that line it jumps to ~815MB. This is surprising because I thought to_batches returns an iterator and that memory should start to be consumed only after I start iteration. I did notice that memory remains stable even as I am iterating over batches.

Using pyarrow.parquet.ParquetFile

>>> import pyarrow as pa
>>> import pyarrow.parquet as pq
>>>
>>> filelist = [
...     'local_data/file1.parquet'
... ]
>>> pqfile = pq.ParquetFile(filelist[0])
>>> sink = pa.BufferOutputStream()
>>> writer = pa.ipc.new_stream(sink, pqfile.schema_arrow)
>>> pqiterator = pq.ParquetFile(filelist[0]).iter_batches(batch_size=500)
>>> for _ in range(4):
...     writer.write_batch(next(pqiterator))
...
>>> buf = sink.getvalue()
>>> buf.size
5636632

Behavior here is more in line with what I expected. Memory consumption does not change once I execute pqiterator = pq.ParquetFile(filelist[0]).iter_batches(batch_size=500) and remains at a very low 19MB.

Note that at the end of both scenarios, the size of the buffer is around 5MB in both cases. And if I read back out of that buffer I get the same (and expected) number of records.

  • Question 1: what explains the high memory consumption of Dataset.to_batches?
  • Question 2: why is there such a difference between Dataset.to_batches and ParquetFile.iter_batches, both of which basically allow me to iterate over batches of records?

Notes:

  1. The documentation on to_batches says

Read the dataset as materialized record batches.

I wondered if "materialized" actually meant it was returning all batches, but that doesn't seem right either as all batches in the file would take up a lot more space.

  1. I see the same results if I use Dataset.scanner() instead of Dataset.to_batches, even though the documentation for scanner says

Data is not loaded immediately. Instead, this produces a Scanner, which exposes further operations

Ting answered 4/8, 2023 at 1:11 Comment(3)
Were you able to find an answer? I too stumbled upon this issue yesterday, ParquetFile was being much faster (200ms vs 1.3s) than dataset approach in my case.Radio
@Radio unfortunately I've not yet found an answer. Though, I've shifted to working on something else for the time being so haven't been actively looking.Ting
My main problem here is not so much speed as lack of memory predictability. I have data (with same schema) coming from different sources, some more densely packed than others, resulting in different memory footprints for the same batch size. When I return to working on this, I want to figure out how to read with dynamic batch size so as to keep memory consumption below some threshold. I will update this question if I'm able to do so.Ting

© 2022 - 2024 — McMap. All rights reserved.