Dealing with a large gzipped file in Spark
Asked Answered
B

5

19

I have a large (about 85 GB compressed) gzipped file from s3 that I am trying to process with Spark on AWS EMR (right now with an m4.xlarge master instance and two m4.10xlarge core instances each with a 100 GB EBS volume). I am aware that gzip is a non-splittable file format, and I've seen it suggested that one should repartition the compressed file because Spark initially gives an RDD with one partition. However, after doing

scala> val raw = spark.read.format("com.databricks.spark.csv").
     | options(Map("delimiter" -> "\\t", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")).
     | load("s3://path/to/file.gz").
     | repartition(sc.defaultParallelism * 3)
raw: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 48 more fields
scala> raw.count()

and taking a look at the Spark application UI, I still see only one active executor (the other 14 are dead) with one task, and the job never finishes (or at least I've not waited long enough for it to).

  • What is going on here? Can someone help me understand how Spark is working in this example?
  • Should I be using a different cluster configuration?
  • Unfortunately, I have no control over the mode of compression, but is there an alternative way of dealing with such a file?
Beaty answered 8/11, 2016 at 17:26 Comment(0)
P
14

If the file format is not splittable, then there's no way to avoid reading the file in its entirety on one core. In order to parallelize work, you have to know how to assign chunks of work to different computers. In the gzip case, suppose you divide it up into 128M chunks. The nth chunk depends on the n-1-th chunk's position information to know how to decompress, which depends on the n-2-nd chunk, and so on down to the first.

If you want to parallelize, you need to make this file splittable. One way is to unzip it and process it uncompressed, or you can unzip it, split it into several files (one file for each parallel task you want), and gzip each file.

Purington answered 8/11, 2016 at 17:55 Comment(5)
I was under the impression that Spark is decompressing the file first before repartitioning it. Is this not the case? What are the four links that I provided talking about, then?Beaty
Yes, Spark is decompressing the file first in its entirety (80G on one core) before it can shuffle it to increase parallelism.Purington
Okay, thank you. Do you think my cluster will even be able to handle this task? If so, if I want to decompress the whole file, repartition it, and then do further processing, do you think setting spark.dynamicAllocation.enabled=true will ensure that I get one executor (with as much memory as possible) to do the decompression and then more executors (with less memory but many cores) after to do the processing?Beaty
This is something you don't need to (and shouldn't) do in Spark. Just do something like zcat file | split -l 1000000 to produce many new files, then recompress each one and go from there.Purington
The gzip file is not easily splittable, but it still can be parallel processed with Spark, increasing end-to-end throughput. Check out the brilliant SplittableGZip codec which trades CPU hours for wall clock gains: github.com/nielsbasjes/splittablegzip (and answer below)Rathskeller
P
5

Spark cannot parallelize reading a single gzip file.

The best you can do split it in chunks that are gzipped.

However, Spark is really slow at reading gzip files. You can do this to speed it up:

file_names_rdd = sc.parallelize(list_of_files, 100)
lines_rdd = file_names_rdd.flatMap(lambda _: gzip.open(_).readlines())

Going through Python is twice has fast as reading the native Spark gzip reader.

Phenica answered 13/10, 2018 at 15:43 Comment(2)
So will this split contents of .gz files equally across the nodes if I further re-partition lines_rdd ?Multiplicand
To make it equal, you may need to repartition. Unless your source gzips are all of similar size.Phenica
R
1

The solution I've used is the de-compression codec: SplittableGZipCodec by Niels Basjes. This codec will feed the same file to multiple spark tasks. Each task will 'fast forward' or seek to a specific offset in the gzip file and then begin decompressing from there. It runs multiple tasks on the same gzip file, significantly decreasing the wall clock time, increasing the chances the gunzip is successful at the small cost of increasing the total core hours used. Brilliant. I've tested it on 20-50GB files.

The spark solution is described here in detail: https://github.com/nielsbasjes/splittablegzip/blob/master/README-Spark.md

# splittable-gzip.py
from pyspark.sql import SparkSession


if __name__ == '__main__':
    spark = (
        SparkSession.builder
        # If you want to change the split size, you need to use this config
        # instead of mapreduce.input.fileinputformat.split.maxsize.
        # I don't think Spark DataFrames offer an equivalent setting for
        # mapreduce.input.fileinputformat.split.minsize.
        .config('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
        .getOrCreate()
    )

    print(
        spark.read
        # You can also specify this option against the SparkSession.
        .option('io.compression.codecs', 'nl.basjes.hadoop.io.compress.SplittableGzipCodec')
        .csv(...)
        .count()
    )
Rathskeller answered 11/10, 2021 at 0:34 Comment(3)
this solution is currently broken since Spark 3.0.1, see the comments in the readme file..Craniometer
I think this option must be working again with current version of spark and wanted to revisit. But, I'm getting an error when trying to use it according to the pyspark docs github.com/nielsbasjes/splittablegzip/blob/main/README-Spark.md on aws glue. Am not sure how to pass the codec to spark on the job definition. Get: pyspark.sql.utils.IllegalArgumentException: Compression codec nl.basjes.hadoop.io.compress.SplittableGzipCodec not found.Rockafellow
My colleague at Databricks has this codec running on DBR 11.3 (Spark 3.3.0/Scala 2.12) and wrote about the experience medium.com/@rahuljax26/autoloader-cookbook-part-1-d8b658268345Rathskeller
P
0

I have faced this problem and here is the solution.

Best way to approach this problem is to unzip the .gz file before our Spark batch run. Then use this unzip file, after that we can use Spark parallelism.

Code to unzip the .gz file.

import gzip
import shutil
with open('file.txt.gz', 'rb') as f_in, gzip.open('file.txt', 'wb') as f_out:
    shutil.copyfileobj(f_in, f_out)
Printery answered 25/3, 2018 at 2:8 Comment(0)
R
0

I struggled with this challenge for a while so I decided to run an experiment. I'm using spark on AWS Glue but think this applies to a spark cluster on EMR as well.

I've noticed several questions on SO with similar challenges (see below). All of us seem to struggle dealing with large gzip CSV or JSON files. As Tim points out above, these are unsplittable. Also, my experiments show, as he pointed out, the best way to deal with them is to process them uncompressed or split them and recompress them.

Here is my approach:

  • I timed a read on a 750GB gzipped csv file (yes, not very large but makes my test faster -- I think the results only magnify the differences as file size grows and as workers are added).
  • I used AWS Glue's DynamicFrame as well as a generic spark csv reader.
  • For each experiment, I did light processing including a show, printSchema, and materialize to a count.
  • For those unsplittable gzip reads resulting in a single partition, I repartitioned to simulate this necessary added step to split into partitions such as on a parquet db.

My file was about 75% compressed so the resulting decompressed file was around 2.7GB.

My results were:

  1. Fastest was to use straight spark or the AWS extended glue dynamicframe preceded with a download decompress. These were dramatically faster than spark without first downloading and decompressing. The resulting spark frame from the downloaded decompressed file had 21 partitions (DynamicFrame was 42) vs. the 1 partition that results from an un-splittable gzip file.

  2. The AWS Dynamic Frame reading a gzip file without first decompressing it took around 40 minutes vs. less than a minute with spark or AWS DF decompressed!

  3. When I tried to use AWS Glue's DynamicFrame "optimizePerformance" for faster CSV SIMD reading, the DynamicFrame never finished processing even though my tab delimited CSV seemed to meet the spec.

For my source file, I used a vertices file from the commoncrawl public repository:

"s3://commoncrawl/projects/hyperlinkgraph/cc-main-2018-feb-mar-apr/domain/cc-main-2018-feb-mar-apr-domain-vertices.txt.gz"

This is a simple tab delimited single-line files with two columns -- a from_id, and to_id.

Here are my split times (in seconds):

                 splits.t0  splits.t_d  ...  splits.t_t   total
task                                    ...                    
df_read_gz             NaN         NaN  ...         NaN    69.0
dyf_read_gz            NaN         NaN  ...         NaN  2266.0
dyf_read_decomp        NaN        50.0  ...         NaN    50.0
df_read_decomp         NaN        50.0  ...         NaN    50.0

df = spark.read.csv(options) dyf = glue_context.create_dynamic_frame.from_options(options)

each with and without a download/decompress ahead of the read/transform. All times include download/decompress, extract, and transform times. Single partition results from gzip unsplittable includes a repartition step.

Test setup was on AWSGlue 4.0 running on Docker.

Related SO Questions:

  1. Using AWS Glue to convert very big csv.gz (30-40 gb each) to parquet
  2. AWS Glue Crawler - Reading a gzip file of csv
  3. How do I split / chunk Large JSON Files with AWS glueContext before converting them to JSON?
  4. Dealing with a large gzipped file in Spark
Rockafellow answered 10/2 at 2:0 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.