How do I split / chunk Large JSON Files with AWS glueContext before converting them to JSON?
Asked Answered
V

2

0

I'm trying to convert a 20GB JSON gzip file to parquet using AWS Glue.

I've setup a job using Pyspark with the code below.

I got this log WARN message:

LOG.WARN: Loading one large unsplittable file s3://aws-glue-data.json.gz with only one partition, because the file is compressed by unsplittable compression codec.

I was wondering if there was a way to split / chunk the file? I know I can do it with pandas, but unfortunately that takes far too long (12+ hours).

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
import pyspark.sql.functions
from pyspark.sql.functions import col, concat, reverse, translate
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

test = glueContext.create_dynamic_frame_from_catalog(
             database="test_db",
             table_name="aws-glue-test_table")


# Create Spark DataFrame, remove timestamp field and re-name other fields
reconfigure = test.drop_fields(['timestamp']).rename_field('name', 'FirstName').rename_field('LName', 'LastName').rename_field('type', 'record_type')

# Create pyspark DF
spark_df = reconfigure.toDF()
# Filter and only return 'a' record types 
spark_df = spark_df.where("record_type == 'a'")
# Once filtered, remove the record_type column
spark_df = spark_df.drop('record_type')
spark_df = spark_df.withColumn("LastName", translate("LastName", "LName:", ""))
spark_df = spark_df.withColumn("FirstName", reverse("FirstName"))

spark_df.write.parquet("s3a://aws-glue-bucket/parquet/test.parquet")

Viscounty answered 21/1, 2022 at 17:24 Comment(0)
F
1

Spark does not parallelize reading a single gzip file. However, you can do split it in chunks.

Also, Spark is really slow at reading gzip files(since its not paralleized). You can do this to speed it up:

file_names_rdd = sc.parallelize(list_of_files, 100)
lines_rdd = file_names_rdd.flatMap(lambda _: gzip.open(_).readlines())
Fallacy answered 23/1, 2022 at 7:34 Comment(3)
Thanks! In my code would I do this after converting the dynamic frame into a spark dataframe?Viscounty
Sorry, I meant ``` glueContext = GlueContext(SparkContext.getOrCreate().parallelize ...) ```Viscounty
Does that work with a single very large gz file? List_of_files having one string?Pastille
P
0

I had this problem with a single large unsplittable CSV file which was Gzip compressed. I believe the accepted answser only works for a list of files.

I used the new AWS Glue for ray and AWS Wrangler to batch read into partitions as follows:

import awswrangler as wr
import ray
import pandas
from ray import data
import pandas as pd


ray.init('auto')
large_dataset = wr.s3.read_csv(
        's3://path_to_large_csv.gz,
        sep=',',
        header = True,
        chunksize = 1e5,
        
    
    )


@ray.remote
def read_batch(batch):
    #print (batch.shape)
    return batch
futures = [read_batch.remote(part) for part in large_dataset]

large_distributed_dataset = data.from_pandas(ray.get(futures))
large_distributed_dataset.write_parquet(
    "s3://path_to_output/"
    )

awswrangler[modin] has to be added to job using --pip-install as guided in the glue docs.

Pastille answered 3/1 at 17:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.