How can I chunk through a CSV using Arrow?
Asked Answered
M

1

9

What I am trying to do

I am using PyArrow to read some CSVs and convert them to Parquet. Some of the files I read have plenty of columns and have a high memory footprint (enough to crash the machine running the job). I am trying to chunk through the file while reading the CSV in a similar way to how Pandas read_csv with chunksize works.

For example this is how the chunking code would work in pandas:

chunks = pandas.read_csv(data, chunksize=100, iterator=True)

# Iterate through chunks
for chunk in chunks:
    do_stuff(chunk)

I want to port a similar functionality to Arrow

What I have tried to do

I noticed that Arrow has ReadOptions which include a block_size parameter, and I thought maybe I could use it like:

# Reading in-memory csv file
arrow_table = arrow_csv.read_csv(
    input_file=input_buffer,
    read_options=arrow_csv.ReadOptions(
        use_threads=True,
        block_size=4096
    )
)

# Iterate through batches
for batch in arrow_table.to_batches():
    do_stuff(batch)

As this (block_size) does not seem to return an iterator, I am under the impression that this will still make Arrow read the entire table in memory and thus recreate my problem.

Lastly, I am aware that I can first read the csv using Pandas and chunk through it then convert to Arrow tables. But I am trying to avoid using Pandas and only use Arrow.

I am happy to provide additional information if needed

Madra answered 28/7, 2021 at 5:54 Comment(1)
If you just want to convert some CSVs to parquet, try this nice rust based CLI tool: csv2parquet. I found it much easier than messing around trying to get pyarrow to play nicely.Skidmore
T
16

The function you are looking for is pyarrow.csv.open_csv which returns a pyarrow.csv.CSVStreamingReader. The size of the batches will be controlled by the block_size option you noticed. For a complete example:

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv

in_path = '/home/pace/dev/benchmarks-proj/benchmarks/data/nyctaxi_2010-01.csv.gz'
out_path = '/home/pace/dev/benchmarks-proj/benchmarks/data/temp/iterative.parquet'

convert_options = pyarrow.csv.ConvertOptions()
convert_options.column_types = {
    'rate_code': pa.utf8(),
    'store_and_fwd_flag': pa.utf8()
}

writer = None
with pyarrow.csv.open_csv(in_path, convert_options=convert_options) as reader:
    for next_chunk in reader:
        if next_chunk is None:
            break
        if writer is None:
            writer = pq.ParquetWriter(out_path, next_chunk.schema)
        next_table = pa.Table.from_batches([next_chunk])
        writer.write_table(next_table)
writer.close()

This example also highlights one of the challenges the streaming CSV reader introduces. It needs to return batches with consistent data types. However, when parsing CSV you typically need to infer the data type. In my example data the first few MB of the file have integral values for the rate_code column. Somewhere in the middle of the batch there is a non-integer value (* in this case) for that column. To work around this issue you can specify the types for columns up front as I am doing here.

Truong answered 28/7, 2021 at 15:52 Comment(9)
Hi Pace! I have implemented the approach but ran across a different error. Decided to cover it as a separate question here if you have time = )Madra
Hi @Pace, I have one question. How is it possible that ParquetWriter.write_table is not simply overwriting the out_path file with the latest chunk?. Is writer opened in 'append' mode? I'm newish to the parquet format and I can't find any info in the docsAccent
@Accent The example creates one instance of ParquetWriter that lives for the lifetime of the scan. A file is opened (once) when the ParquetWriter instance is created. Each call to write_table is adding more data to the existing open file. At the end, when writer.close is called then the file is closed. So there is no need for an append in this case, thought parquet file would not be readable until the entire loop had run and the file had closed.Truong
But I don’t see where you are passing block_size param in your example.Meggy
@Meggy if the question is "how can I read a CSV file part by part to save memory?" then you don't need to use the block_size parameter. The answer is "use open_csv instead of read_csv". It's pretty unusual that you need to care about or mess with block_size and so I left it out of the example.Truong
Got it @Pace. Thank you. I was thinking of trying to leverage this in parallelizing some very large gzip files on AWS Glue Spark or Ray. Ray uses pyarrow and this streaming read (I think) under the hood. the issue is that gzip is unsplittable and so one worker has to read it all in before any repartitioning or parallel work can be performed. I may start another question? Spark takes FOREVER to do this with some read taking multiple hours (for > 5GB .gz csv files).Meggy
CSV files themselves are kind of unsplittable because you have to hit a line boundary with your split to make any meaning. Splitting is also challenging when you are doing type inference. Arrow's streaming reader is not as parallel as the non-streaming reader but it should at least be efficient and do a better job than a single core system. Hours for 5GB seems wrong to me. I would expect parsing that to take less than 1 minute even without any real parallelism.Truong
I don’t understand it either. These are simple one line tsv files gzipped ranging from 300mb to 20gb. They are internet graph files in common crawls open repository on AWS.Meggy
If you want to speed up the performance, you could put the writing on a background thread whiule the reads oiccure in the foreground. This has worked very nicely for me.Unknow

© 2022 - 2024 — McMap. All rights reserved.