Pyarrow Write/Append Columns Arrow File
Asked Answered
D

1

2

I have a calculator that iterates a couple of hundred object and produces Nx1 arrays for each of those objects. N here being 1-10m depending on configurations. Right now I am summing over these by using a generator expression, so memory consumption is low. However, I would like to store the Nx1 arrays to file, so I can do other computations.(Compute quantiles, partial sums etc. pandas style) Preferably I would like to use pa.memory_map on a single file (in order to have dataframes not loaded into memory), but I can not see how I can produce such a file without generating the entire result first. (Monte Carlo results on 200-500*10m floats).

If I understand correctly RecordBatchStreamWriter needs a part of the entire table, and I can not produce only a part of it. The parts the calculator produces is the columns, not parts of all columns. Is there any way of writing "columns" one by one? Either by appending, or create an empty arrow file which can be filled? (schema known).

As I see it, my alternative is to write several files and use "dataset" /tabular data to "join" them together. My "other computations" would then have to filter or pull parts into memory as I can`t see in the docs that "dataset()" work with memory_map.The result set is to big to fit in memory. (At least on the server it is running on)

I`m on day 2 of digging the docs and trying to understand how it all works, so apologies if the "lingo" is not all correct.

On further inspection, it looks like all files used in datasets() must have same schema, so I can not split "columns" in separate files either, can I..

EDIT After wrestling with this library, I now produce single column files which I later combine in a single file. However, in following the suggested solution visible memory consumption (task manager) skyrockets in the step of combining the files. I would expect peaks for every "rowgroup" or combined recordbatch, but instead steadily increase to use all memory. A snip of this step:

    readers = [pa.ipc.open_stream(file) for file in self.tempfiles]
    combined_schema = pa.unify_schemas([r.schema for r in readers])
        with pa.ipc.new_stream(
            os.path.join(self.filepath, self.outfile_name + ".arrow"),
            schema=combined_schema,
        ) as writer:

            for group in zip(*readers):

                combined_batch = pa.RecordBatch.from_arrays(
                    [g.column(0) for g in group], names=combined_schema.names
                )

                writer.write_batch(combined_batch)

From this link I would expect that running memory consumption to be that of combined_batch and some.

Doretha answered 24/2, 2022 at 18:50 Comment(0)
E
0

You could do the write in two passes.

  • First, write each column to its own file. Make sure to set a row group size small enough that a table consisting of one row group from each file comfortably fits into memory.
  • Second, create a streaming reader for each file you created and one writer. Read a single row group from each one. Create a table by combining all of the partial columns and write the table to your writer. Repeat until you've exhausted all your readers.

I'm not sure that memory mapping is going to help you much here.

Elegiac answered 25/2, 2022 at 8:6 Comment(20)
What you are describing here, is the process og generating a single file, correct? I guess I can then do a memory_map on the resulting file, and to a to_numpy() in order to have access to the full dataset without pulling it in memory? (no null/na, equal length columns/arrays). Is there no way of combining all single files, i.e memory_map all files and combine them. I saw this post where a guy made a memory mapped file, and fed it to RecordBatchFileReader, read all and did to_pandas(). Could I do something similar on all files, combine them, and then do a to_pandas/to_numpy ?Doretha
Yes this would generate a single file. Yes, you could memory map the resulting file and call to_numpy() on it. It would basically be the same as declaring a very large swap file. Any columns that can't be zero-copy converted to pandas (e.g. strings) would be read into RAM immediately during the to_pandas call but those columns that do support zero-copy would remain on disk until accessed. You can generally do the same without memory mapping by only reading in the columns you are going to need to access.Elegiac
Thanks, that last question was about the possibility of skipping the whole "make one file" process. Thanks though. In many cases I need to sum over all columns (rowwise), so pulling some columns, only solves some of my use cases. Also, have not found a way to do rowsums on batchrecords/tables in pyarrow. I guess i could do group_by on long format data instead, but that would duplicate data immensely. Field names x 1-10m.Doretha
Sorry, I did not find any settings for row group size?Doretha
What method are you using to write your table? pyarrow.parquet.write_table has row_group_size. pyarrow.ipc.RecordBatchFileWriter.write_table has max_chunksize. pyarrow.dataset.write_dataset has max_rows_per_groupElegiac
I made a np.array (Nx1), then a RecordBatch out of it. I then did ips.new:file() with a path and schema, and write_batch(). I`m staying clear of parquet and using the ipc format. I need to do write_table instead (make a table of a single column)? Max_chunksize, is that a size in number of "rows".Doretha
Yes, there probably should be a max_chunksize argument for write_batch too (feel free to file a JIRA) but at the moment you can only specify it when writing a table. So you will need to make a table from a single column. If you have a record batch then wrapping it as a table should be a pretty cheap operation so I wouldn't expect much performance difference between the two. Yes, max_chunksize is a number of rows.Elegiac
So.. I read in "row groups", make individual arrays from the resulting Table reads, and then make a combined Table with all my columns/arrays. Iterating over all groups , I call write_table for each iteration. As far as I can see the result when reading back the combined file, is a chunked array, which will require a copy. (arr.num_chunks == 1 for zero-copy) arrow.apache.org/docs/python/pandas.html . Was this your suggestion, or am I still doing this wrong?Doretha
Ok, so I know understand that an entire dataframe can not be memory mapped with zero-copy. Only individual columns/series.Doretha
My thinking was that you could then process it in an iterative fashion. Instead of converting the entire file to a single pandas dataframe you can convert each batch to a dataframe, compute the resulting series for that batch, and convert that back. In the end you would have a chunked array as your result and no copies would be performed. If you want the entire table as a single pandas dataframe then my suggestion won't work.Elegiac
In theory it might be possible to create an IPC file with a single record batch which is larger than the server's RAM. I don't think pyarrow would be able to create such a file however. It's writers need an in-memory record batch as input.Elegiac
Hmm sad.. For one of my use-cases, this would work. In this case, I only work on individual series based on some aggregated result. On the other use-case, I need to do a bunch of rowsums, summing across "columns" (like pd.sum axis=1). For this, I guess the performance would be absolutely terrible. I could write a generator function that yielded each column, and accumulating the sum, but it just sounds overly complicated.Doretha
I have not tried the compute functions. Maybe I`ll have a go at skipping pandas altogether, but I guess I will have to go "long-format" on the data. If my recordbatches was "id"(int), "index"(int),"value"(float), I guess I could do group_by on index, and sum. I guess I would be writing tons of unnecessary duplicates, but maybe the performance gain would be worth it. Any experience with it? Would pyarrow be able to do the sum without pulling the data into memory? Worst case I could be looking at values amounting to 40gb, adding index and id, times three I guess.Doretha
Group by on larger-than-memory data is experimental in the C++ implementation at the moment and not exposed in the python implementation. 7.0.0 just recently added support for in-memory aggregation but that won't work for you. I'm not sure why the iterative approach doesn't work for rowsums. You would process the large file (all columns and many record batches) one record batch at a time. For each record batch you could compute the rowsums (output would be chunked array).Elegiac
Performance won't be great on rowsums in general because arrow is going to be storing the data in a column-major fashion.Elegiac
One efficient way to represent your data might be a "single column of 1D tensors" but pyarrow doesn't have a lot of compute functions in place for tensors today.Elegiac
I see. Yup, went down the route of long data and group by. Memory mapped the file and read_all(). At the point I start the aggregation memory consumption starts growing, so it's a no go. An informal speedtest on rowsums: with numpy array 10e6 x 100 about 1 sec, pandas about 3 times slower. group_by on memory mapped file about 50 secs. I guess it is not only compute time, but also reading it into memory.Doretha
Running the model wich produces the output takes about 10 secs, so running it 200+ times is half an hour computing time if I were to skip files altogheter. I imagine your solution of pulling parts into memory will be more efficient than rerunning my model with different inputs. It would be great if I could omit the whole single-file to one-file process and do"column"-writing, although the suggested scheme works well if I go multiprocessing. Thanks for time spent with me, reasoning on the issue at hand.Doretha
Hi @Pace, not getting this to work as intended. Do you have any comments on my take?Doretha
Apologies, I did not see your update regarding memory usage. I suspect what you are seeing is the writes filling up the disk cache on the OS. There is an open issue to allow for a "cache skipping" write but it hasn't been implemented yet.Elegiac

© 2022 - 2024 — McMap. All rights reserved.