Repartitioning parquet-mr generated parquets with pyarrow/parquet-cpp increases file size by x30?
Asked Answered
E

1

2

Using AWS Firehose I am converting incoming records to parquet. In one example, I have 150k identical records enter firehose, and a single 30kb parquet gets written to s3. Because of how firehose partitions data, we have a secondary process (lambda triggered by s3 put event) read in the parquet and repartitions it based on the date within the event itself. After this repartitioning process, the 30kb file size jumps to 900kb.

Inspecting both parquet files-

  • The meta doesn't change
  • The data doesn't change
  • They both use SNAPPY compression
  • The firehose parquet is created by parquet-mr, the pyarrow generated parquet is created by parquet-cpp
  • The pyarrow generated parquet has additional pandas headers

The full repartitioning process-

import pyarrow.parquet as pq

tmp_file = f'{TMP_DIR}/{rand_string()}'
s3_client.download_file(firehose_bucket, key, tmp_file)

pq_table = pq.read_table(tmp_file)

pq.write_to_dataset(
    pq_table,
    local_partitioned_dir,
    partition_cols=['year', 'month', 'day', 'hour'],
    use_deprecated_int96_timestamps=True
)

I imagine there would be some size change, but I was surprised to find such a big difference. Given the process i've described, what would cause the source parquet to go from 30kb to 900kb?

Eviaevict answered 26/10, 2018 at 16:38 Comment(2)
It's going to be hard for us to say why without a reproducible example. I can't think of any reason why off the top of my headSwarth
This could be simply down to the number of files created. There is a fixed overhead of 4kb+ per Parquet file. When you repartition to too many files, this could be one of the sources.Worldshaking
C
4

Parquet uses different column encodings to store low entropy data very efficiently. For example:

  • It can use delta encoding to only store differences between values. For example 9192631770, 9192631773, 9192631795, 9192631797 would be stored effectively as 9192631770, +3, +12, +2.
  • It can use dictionary encoding to shortly refer to common values. For example, Los Angeles, Los Angeles, Los Angeles, San Francisco, San Francisco would be stored as a dictionary of 0 = Los Angeles, 1 = San Francisco and the references 0, 0, 0, 1, 1
  • It can use run-length encoding to only store the number of repeating values. For example, Los Angeles, Los Angeles, Los Angeles would be effectively stored as Los Angeles×3. (Actually as far as I know pure RLE is only used for boolean types at this moment, but the idea is the same.)
  • A combination of the above, specifically RLE and dictionary encoding. For example, Los Angeles, Los Angeles, Los Angeles, San Francisco, San Francisco would be stored as a dictionary of 0 = Los Angeles, 1 = San Francisco and the references 0×3, 1×2

With the 3 to 5 values of the examples above, the savings are not that significant, but the more values you have the bigger the gain. Since you have 150k identical records, the gains will be huge, since with RLE dictionary encoding, each column value will only have to be stored once, and then marked as repeating 150k times.

However, it seems that pyarrow does not use these space-saving encodings. You can confirm this by taking a look at the metadata of the two files using parquet-tools meta. Here is a sample output:

file schema: hive_schema 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id:          OPTIONAL INT32 R:0 D:1
name:        OPTIONAL BINARY O:UTF8 R:0 D:1

row group 1: RC:61 TS:214 OFFSET:4 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id:           INT32 UNCOMPRESSED DO:0 FPO:4 SZ:107/107/1.00 VC:61 ENC:BIT_PACKED,RLE,PLAIN_DICTIONARY ST:[min: 1, max: 5, num_nulls: 0]
name:         BINARY UNCOMPRESSED DO:0 FPO:111 SZ:107/107/1.00 VC:61 ENC:BIT_PACKED,RLE,PLAIN_DICTIONARY ST:[min: Los Angeles, max: San Francisco, num_nulls: 0]

The encoding is shown as ENC:BIT_PACKED,RLE,PLAIN_DICTIONARY.

Chivalrous answered 30/10, 2018 at 10:50 Comment(6)
This makes a lot of sense- thank you! However, in this example it only attempts to repartition. The result is data doesn't split. 1 File -> Repartition -> 1 File. So somewhere in the repartition process it's not compressing efficiently?Eviaevict
Hmm, indeed, with identical records the partition will be the same as well, I should have realized that. :) In that case my only guess is that pyarrow did not or can not use any of these space-saving encodings for some reason. Could you post the output of parquet-tools meta for both files? I expect to see different encodings for the two files.Chivalrous
You were correct about compression method. Pandas doesn't currently, and may never, support this kind of compression. So if you have 1000 NULL values, pandas will actually populate 1000 NULL values instead of making a note of "the next 1000 are null"Eviaevict
What I wrote mainly applies to regular values as NULL values are handled somewhat specially. Nevertheless, the lack of support for some encodings certainly can inflate file sizes.Chivalrous
Hopefully that didn't sound like it was contradicting you. NULL values are handled specially in parquet files. But the pandas library doesn't leverage this in it's implementation of parquet- resulting in the much larger file size after passing in and out of pandasEviaevict
No, I just wanted to point out that it's an additional intricacy that NULL values are handled differently.Chivalrous

© 2022 - 2024 — McMap. All rights reserved.