Create Parquet files from stream in python in memory-efficient manner
Asked Answered
U

2

12

It appears the most common way in Python to create Parquet files is to first create a Pandas dataframe and then use pyarrow to write the table to parquet. I worry that this might be overly taxing in memory usage - as it requires at least one full copy of the dataset to be stored in memory in order to create the pandas dataframe.

I wonder if loading the entire dataset into memory is required due to the columnar compression requirements, or if there's a more efficient and stream-based approach. In my case, I will receive the records in a streaming fashion. For a similar csv output process, we write rows out to disk in batches of 1000 so the number of rows needing to be held in memory never reaches the size of the full dataset.

Should I...?:

  1. Just create a pandas dataframe and then write it to the parquet. (Meaning the entire dataset will need to be stored in memory, but we treat this as a necessary requirement.)
  2. Use some streaming-friendly way to write 1000 or so rows at a time as we receive them, minimizing the total point-in-time ram consumption over the course of the process. (I didn't see any documentation on how to do this and I'm not sure it's even an option for parquet.)
  3. Write everything to CSV and then use a function that smartly reads/analyzes the CSV contents and creates the compressed parquet after-the-fact. (Slower runtime perhaps but lower memory profile and lower chance to fail on a very large file.)

Thoughts? Suggestions?

Unclasp answered 11/11, 2020 at 17:48 Comment(1)
Writing multiple row groups will work - https://mcmap.net/q/332841/-convert-csv-to-parquet-file-using-pythonCryology
E
11

Use some streaming-friendly way to write 1000 or so rows at a time as we receive them, minimizing the total point-in-time ram consumption over the course of the process.

You can do this.

(I didn't see any documentation on how to do this and I'm not sure it's even an option for parquet.)

At least now there is a bit of documentation at https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html on how to do this - specifically the write_batch function


Here is an example, although it would have to be tweaked a bit depending on the data source. For example if if already comes "chunked" or if the schema has to be inferred from the data rather than hard coded as here.

The example also goes via Pandas mostly since it's a convenient way of converting from rows to columns to create each RecordBatch, but there are other ways of creating each RecordBatch that don't need pandas.

import itertools
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# Any iterable that yields rows as tuples
def get_rows():
    for i in range(0, 10000):
        yield (1, 2)

# Chunk the rows into arrow batches
def get_batches(rows_iterable, chunk_size, schema):
    rows_it = iter(rows_iterable)
    while True:
        batch = pa.RecordBatch.from_pandas(
            pd.DataFrame(itertools.islice(rows_it, chunk_size), columns=schema.names),
            schema=schema, preserve_index=False,
        )
        if not batch:
            break
        yield batch

# Could be inferred from data, but note that the schema has to be
# known when creating the ParquetWriter object
schema = pa.schema([
    ('a', pa.int32()),
    ('b', pa.int32()),
])
rows = get_rows()
batches = get_batches(rows, chunk_size=1000, schema=schema)

# Write the batches
with pq.ParquetWriter('example.parquet', schema=schema) as writer:
    for batch in batches:
        writer.write_batch(batch)
Ephor answered 19/9, 2022 at 9:43 Comment(2)
Do you know example using arrow C++ functions?Mirza
This is such a good answer, in so many ways. I wish I could nominate this answer for "Answer of the Year" or something like that!Susceptible
O
1

You want Row Groups in Parquet. See here for an explanation of what they are but the short version is that columnar data is limited to chunks of a number of rows and each chunk can be appended to the file separately. You can use PyArrow to implement this for a stream of incoming data.

Omar answered 13/11, 2020 at 12:43 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.