How to set "zstd" compression level in AWS Glue job?
Asked Answered
C

1

0

Background

"zstd" compression codec has 22 compression levels. I read this Uber blog. Regarding compressing time and file size, I verified using df.to_parquet with our data and got same experiment result. So I am hoping to set compression level to 19 in our AWS Glue Spark job which also writes the data to Delta Lake.

Experiment 1

My AWS Glue job is using "Glue 4.0 - Spark 3.3, Scala 2, Python 3" version.

Here is my code

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={},
    connection_type="s3",
    format="parquet",
    connection_options={
        "paths": [
            "s3://my-bucket/data/raw-parquet/motor/"
        ],
        "recurse": True,
    },
    transformation_ctx="S3bucket_node1",
)

additional_options = {
    "path": "s3://my-bucket/data/delta-tables/motor/",
    "mergeSchema": "true",
}
sink_to_delta_lake_node3_df = S3bucket_node1.toDF()
sink_to_delta_lake_node3_df.write.format("delta").options(**additional_options).mode(
    "overwrite"
).save()

job.commit()

Based on https://mcmap.net/q/1026616/-how-to-change-zstd-compression-level-for-files-written-via-spark, I may use --conf parquet.compression.codec.zstd.level=19. (Note the author who wrote the answer said it does not seem working. On the other side, Uber in the blog made it work, so I am thinking there could be a way to set "zstd" compression level correctly in Spark)

Here is my --conf:

--conf spark.sql.parquet.compression.codec=zstd
--conf parquet.compression.codec.zstd.level=19

I added these config in my Glue job by "Job details -> Advanced properties -> Job parameters":

  • Key: --conf
  • Value: spark.sql.parquet.compression.codec=zstd --conf parquet.compression.codec.zstd.level=19

(This is the current way to set multiple --conf in AWS Glue job which I have verified working as expected before)

I compared with compression level 3. However, both compression level 19 and 3 generated exact same parquet file size 97 MB (97,002,126 bytes) in the Delta table.

To make sure different "zstd" compression level on this data have different size, I tried Python code:

df.to_parquet(
  local_parquet_path,
  engine="pyarrow",
  compression="zstd",
  compression_level=19
)

The file size using compression level 19 is 92% of the file size using compression level 3, which means for this data, when the compression level very different, there is difference for the file size. So I feel --conf parquet.compression.codec.zstd.level=19 in Spark does not function as expected.

How to set "zstd" compression level in AWS Glue job? Thanks!


Experiment 2 ~ 4 (9/29/2023)

Tried these combinations:

--conf spark.sql.parquet.compression.codec=zstd
--conf parquet.compression.codec.zstd.level=19
--conf spark.sql.parquet.compression.codec=zstd
--conf spark.io.compression.codec=zstd
--conf spark.io.compression.zstd.level=19
--conf spark.sql.parquet.compression.codec=zstd
--conf parquet.compression.codec.zstd.level=19
--conf spark.io.compression.codec=zstd
--conf spark.io.compression.zstd.level=19

Still got exact same "zstd" parquet file size in the Delta Lake compared to without setting any compression level or setting to 3.


Experiment 5 (9/29/2023)

If only use

--conf spark.io.compression.codec=zstd
--conf spark.io.compression.zstd.level=19

This actually make the final file to Snappy format like part-00000-1c8c7408-b14f-4ba1-9030-ecc437a2f8d3-c000.snappy.parquet. It means spark.io.compression.codec=zstd does not work as expected.

Cymophane answered 29/9, 2023 at 19:12 Comment(2)
Can you try spark.io.compression.zstd.level?Baird
Thanks @ErmiyaEskandary just tired spark.io.compression.zstd.level=19, unfortunately, still same size.Cymophane
J
0

The spark.io.compression.codec isn't used for writing the resulting parquet files - it's used for writing compressed internal data such as RDD partitions, event logs, broadcast variables and shuffle outputs (check documentation).

From the discussion in the linked Jira, it looks like this is not configurable in Spark (but maybe I need to check the source code for it).

Josettejosey answered 30/9, 2023 at 7:4 Comment(1)
Appreciate, Alex!Cymophane

© 2022 - 2024 — McMap. All rights reserved.