Incrementally writing Parquet dataset from Python
Asked Answered
O

1

2

I am writing a larger than RAM data out from my Python application - basically dumping data from SQLAlchemy to Parque. My solution was inspired by this question. Even though increasing the batch size as hinted here I am facing the issues:

  • RAM usage grows heavily

  • The writer starts to slow down after a while (write throughput speed drops more than 5x)

My assumption is that this is because the ParquetWriter metadata management becomes expensive when the number of rows increase. I am thinking that I should switch to datasets that would allow the writer to close the file in the middle of processing flush out the metadata.

My question is

  • Is there an example for writing incremental datasets with Python and Parquet

  • Are my assumptions correct or incorrect and using datasets would help to maintain the writer throughput?

My distilled code:


writer = pq.ParquetWriter(
                    fname,
                    Candle.to_pyarrow_schema(small_candles),
                    compression='snappy',
                    allow_truncated_timestamps=True,
                    version='2.0',  # Highest available schema
                    data_page_version='2.0',  # Highest available schema
            ) as writer:

    def writeout():
        nonlocal data
        duration = time.time() - stats["started"]
        throughout = stats["candles_processed"] / duration
        logger.info("Writing Parquet table for candle %s, throughput is %s", "{:,}".format(stats["candles_processed"]), throughout)
        writer.write_table(
            pa.Table.from_pydict(
                    data,
                    writer.schema
            )
        )
        data = dict.fromkeys(data.keys(), [])
        process = psutil.Process(os.getpid())
        logger.info("Flushed %s writer, the memory usage is %s", bucket, process.memory_info())

    # Use massive yield_per() or otherwise we are leaking memory
    for item in query.yield_per(100_000):
        frame = construct_frame(row_type, item)
        for key, value in frame.items():
            data[key].append(value)

        stats["candles_processed"] += 1

        # Do regular checkopoints to avoid out of memory
        # and to log the progress to the console
        # For fine tuning Parquet writer see
        # https://issues.apache.org/jira/browse/ARROW-10052
        if stats["candles_processed"] % 100_000 == 0:
            writeout()
Ogee answered 14/7, 2021 at 9:14 Comment(1)
1. Your throughput measures the time it takes to extract record, convert them and write them to parquet. I think you should try to measure each step individually to pin point exactly what's the issue. It may be parquet, but it may be the rest of your code. 2. why call data=dict.fromkeys(data.keys(), []) and not data.clear(). If your keys are unique accross all records, you're leaking memory as data would keep growing.Ebb
O
3

In this case, the reason was the incorrect use of Python lists and dicts as a working buffer, as pointed out by @0x26res.

After making sure the dictionary of lists is cleared correctly, the memory consumption issues become negligible.

Ogee answered 14/7, 2021 at 10:0 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.