How to read large CSV with Beam?
Asked Answered
V

3

14

I'm trying to figure out how to use Apache Beam to read large CSV files. By "large" I mean, several gigabytes (so that it would be impractical to read the entire CSV into memory at once).

So far, I've tried the following options:

  • Use TextIO.read(): this is no good because a quoted CSV field could contain a newline. In addition, this tries to read the entire file into memory at once.
  • Write a DoFn that reads the file as a stream and emits records (e.g. with commons-csv). However, this still reads the entire file all at once.
  • Try a SplittableDoFn as described here. My goal with this is to have it gradually emit records as an Unbounded PCollection - basically, to turn my file into a stream of records. However, (1) it's hard to get the counting right (2) it requires some hacky synchronizing since ParDo creates multiple threads, and (3) my resulting PCollection still isn't unbounded.
  • Try to create my own UnboundedSource. This seems to be ultra-complicated and poorly documented (unless I'm missing something?).

Does Beam provide anything simple to allow me to parse a file the way I want, and not have to read the entire file into memory before moving on to the next transform?

Versify answered 20/7, 2018 at 9:17 Comment(1)
I created an issue in Apache JIRA which asks the team to add CSVIO for Apache Beam: issues.apache.org/jira/browse/BEAM-10030Kynthia
R
0

The TextIO should be doing the right thing from Beam's prospective, which is reading in the text file as fast as possible and emitting events to the next stage.

I'm guessing you are using the DirectRunner for this, which is why you are seeing a large memory footprint. Hopefully this isn't too much explanation: The DirectRunner is a test runner for small jobs and so it buffers intermediate steps in memory rather then to disk. If you are still testing your pipeline, you should use a small sample of your data until you think it is working. Then you can use the Apache Flink runner or Google Cloud Dataflow runner which will both write intermediate stages to disk when needed.

Roundabout answered 23/7, 2018 at 14:13 Comment(7)
I know that the DirectRunner does things locally. What I'm saying is: what if I have a huge file and the JVM needs 100GB of memory to read it all in at once? Then whatever runner I'm using has to be able to scale up to 100GB of memory usage...when it would be simpler (and cheaper) to simply stream the data and process it continuously.Versify
The DirectRunner isn't just local, it is in memory only. The DirectRunner lacks the ability to provide back pressure should the data be read faster then it can be processed and also lacks the ability to buffer intermediate data to disk. TextIO reads one record at a time from the file, but it is very efficient and might have multiple threads reading at once: github.com/apache/beam/blob/…Roundabout
OK, so basically what you're saying is that the different Transforms behave differently depending on where they're running...and TextIO will "stream" its results using an Unbounded PCollection? Will I get the same behavior if I use Commons-CSV? Right now my naïve implementation is: CSVParser records = CSVParser.parse(inputStream, StandardCharsets.UTF_8, format); for(CSVRecord record : records) { c.output(record); }Versify
In other words, if I do it this way, will some runners send a batch of CSVRecords on to the next Transform, before the entire file has been read?Versify
Kricket I don't know if you solved this, but what Andrew mentions is true. The implementation of the pipeline changes completely depending on the runner you are using. The direct runner is a tool for prototyping and testing small datasets as it loads everything in memory. Production runners like the flink or the dataflow one, use a "work preparation stage" splitting the input and arranging several threads for reading them; and they use secondary memory (ie your filesystem) to avoid having memory problems. Back to your CSV question, did you find a suitable solution for reading multiline CSVs?Brott
But your question still remains regarding how to read multiline CSVs using a native Beam solution (I have not found that out - yet it should exist given the broad usage of CSVs). On my comment above I just stated that when you use a "production runner", you are probably going to avoid having memory problems, despite reading from huge files; so IO performance wise or memory wise don't worry. Yet your original question remain, how to read multilines CSV's? That I haven't found sorry. Have you solved this problem since your original post?Brott
@tony_008 No, I never found a way to use a "real" CSV parser to handle, e.g. quoted fields with newlines. IIRC we arranged for incoming CSVs to not have any newlines in the records, so reading one line at a time was OK.Versify
C
0

In general, splitting csv files with quoted newlines is hard as it may require arbitrary look-back to determine whether a given newline is or is not in a quoted segment. If you can arrange such that the CSV has no quoted newlines, TextIO.read() works well. Otherwise

  • If you're using BeamPython, consider the dataframe operation apache_beam.dataframe.io.read_csv which will handle quotation correctly (and efficiently).
  • In another language, you can either use that as a cross-language transform, or create a PCollection of file paths (e.g. via FileIO.MatchAll) followed by a DoFn that reads and emits rows incrementally using your CSV library of choice. With the exception of a direct/local runner, this should not require reading the entire file into memory (though it will cause each individual file to be read by a single worker, possibly limiting parallelism).
Cravens answered 15/8, 2022 at 22:14 Comment(0)
C
0

You can use the logic in Text to Cloud Spanner for handling new lines while reading a CSV.

This template reads data from a CSV and writes to Cloud Spanner.

The specific files containing the logic to read CSV with newlines are in ReadFileShardFn and SplitIntoRangesFn.

Conceptionconceptual answered 19/12, 2022 at 8:43 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.