For our use case we need to load in json files from an S3 bucket. As processing tool we are using AWS Glue. But because we will soon be migrating to Amazon EMR, we are already developing our Glue jobs with Spark functionalities only. So that in the future the migration will be easier. Meaning that for our use case we can't use any Glue functionalities like grouping input files.
The problem we are facing is that when we are reading in these JSON files, we see that our driver's memory is going to 100% until eventually the job fails in OOM exceptions.
We already tried maxing out the driver memory by using G.2X instances and adding --conf spark.driver.memory=20g
argument to our Glue job.
The code we are using is as simple as:
spark.read.option("inferSchema", value = true).json("s3://bucket_with_json/sub_folder")
The input data are 21 json files with a size of 100MB. The files itself are not valid json objects, but each file contains multiple json objects. Like for example:
{
"RecordNumber": 2,
"Zipcode": 704,
"ZipCodeType": "STANDARD",
"City": "PASEO COSTA DEL SUR",
"State": "PR"
}
{
"RecordNumber": 10,
"Zipcode": 709,
"ZipCodeType": "STANDARD",
"City": "BDA SAN LUIS",
"State": "PR"
}
(not the real dataset)
The Glue Job specs we are currently using:
- Worker type: G.2X
- Number of workers: 20
- Additional Spark arguments:
'--conf': 'spark.driver.maxResultSize=2g --conf spark.yarn.executor.memory=7g --conf spark.driver.memory=20g'
- Job language: scala
- Glue version: 3.0
This visual shows how the memory is exceeds the maximum for the driver vs. the memory of the executors:
And the error we are getting after +- 10 mins is:
Command Failed due to Out of Memory
java.lang.OutOfMemoryError: Java heap space -XX:OnOutOfMemoryError="kill -9 %p" Executing /bin/sh -c "kill -9 8"...
Also worth noting that when we are running on a smaller set of data, everything works fine.
I'm kind of out of options at this point. Can someone help me get this fixed or point me to the right direction? Also if someone could explain why my driver gets overflooded. I always thought that the json files are read by the executors. I'm not collecting any data to driver after I read in the data, so I can't explain why this is happening.
** EDIT **
I tried to convert the input files to one valid json. So transforming to format:
[{
"RecordNumber": 2,
"Zipcode": 704,
"ZipCodeType": "STANDARD",
"City": "PASEO COSTA DEL SUR",
"State": "PR"
},
{
"RecordNumber": 10,
"Zipcode": 709,
"ZipCodeType": "STANDARD",
"City": "BDA SAN LUIS",
"State": "PR"
}]
And used option:
.option("multiline", "true")
But unfortunately this gives me the same result/error..
** EDIT **
I would like to add that the data example above and it's structure does not resemble the data I am using. To give you some information about my data:
The structure is very nested. It contains 25 top-level fields. 7 of them are nested. If you flatten everything you end up with +- 200 fields. It is possible that the inferSchema
option is the cause of my issue.
jsoniter-scala
immediately fromjava.io.FileInputStream
into your case classes usingscanJsonValuesFromStream
call at ~1GB/sec speed without need to hold the whole input or/and output in the memory. – PurpurasamplingRatio
to something less than 1? – Electricity