Unable to read json files in AWS Glue using Apache Spark
Asked Answered
W

2

5

For our use case we need to load in json files from an S3 bucket. As processing tool we are using AWS Glue. But because we will soon be migrating to Amazon EMR, we are already developing our Glue jobs with Spark functionalities only. So that in the future the migration will be easier. Meaning that for our use case we can't use any Glue functionalities like grouping input files.

The problem we are facing is that when we are reading in these JSON files, we see that our driver's memory is going to 100% until eventually the job fails in OOM exceptions.

We already tried maxing out the driver memory by using G.2X instances and adding --conf spark.driver.memory=20g argument to our Glue job.

The code we are using is as simple as:

spark.read.option("inferSchema", value = true).json("s3://bucket_with_json/sub_folder")

The input data are 21 json files with a size of 100MB. The files itself are not valid json objects, but each file contains multiple json objects. Like for example:

{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
}
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}

(not the real dataset)

The Glue Job specs we are currently using:

  • Worker type: G.2X
  • Number of workers: 20
  • Additional Spark arguments: '--conf': 'spark.driver.maxResultSize=2g --conf spark.yarn.executor.memory=7g --conf spark.driver.memory=20g'
  • Job language: scala
  • Glue version: 3.0

This visual shows how the memory is exceeds the maximum for the driver vs. the memory of the executors:

enter image description here

And the error we are getting after +- 10 mins is:

Command Failed due to Out of Memory

java.lang.OutOfMemoryError: Java heap space -XX:OnOutOfMemoryError="kill -9 %p" Executing /bin/sh -c "kill -9 8"...

Also worth noting that when we are running on a smaller set of data, everything works fine.

I'm kind of out of options at this point. Can someone help me get this fixed or point me to the right direction? Also if someone could explain why my driver gets overflooded. I always thought that the json files are read by the executors. I'm not collecting any data to driver after I read in the data, so I can't explain why this is happening.

** EDIT **

I tried to convert the input files to one valid json. So transforming to format:

[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
},
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}]

And used option:

.option("multiline", "true")

But unfortunately this gives me the same result/error..

** EDIT **

I would like to add that the data example above and it's structure does not resemble the data I am using. To give you some information about my data:

The structure is very nested. It contains 25 top-level fields. 7 of them are nested. If you flatten everything you end up with +- 200 fields. It is possible that the inferSchema option is the cause of my issue.

Weinrich answered 24/1, 2023 at 15:38 Comment(4)
You can copy files and save locally and then parse your multiline JSON with jsoniter-scala immediately from java.io.FileInputStream into your case classes using scanJsonValuesFromStream call at ~1GB/sec speed without need to hold the whole input or/and output in the memory.Purpura
Have you tried a) defining a schema yourself (instead of using inferSchema=true), OR b) setting samplingRatio to something less than 1?Electricity
Also, I assume all your files are of valid (from Spark's pov) notation -- "Each line must contain a separate, self-contained valid JSON object. For more information, please see JSON Lines text format, also called newline-delimited JSON."Electricity
@Electricity I tried with samplingRatio 0.1. But still the driver is running OOM.Weinrich
Q
6

I think setting inferSchema == true could be the problem, as that's performed in the driver. So why infer schema while reading (requires an extra pass over the data, requires more driver resources)? Maybe the reason is lost on this toy example, but maybe you can try this?

First... your second file format worked fine (first did not)... i created a few files like this and stuck them all in a folder on S3.

[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
},
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}]

One alternative I'd try is to provide the schema yourself when you read.

import org.apache.spark.sql.types.{ IntegerType, StringType, StructField, StructType }

val schema = {
    new StructType()
      .add(StructField("RecordNumber", IntegerType, false))
      .add(StructField("Zipcode", IntegerType, true))
      .add(StructField("ZipCodeType", StringType, true))
      .add(StructField("City", StringType, true))
      .add(StructField("State", StringType, true))
  }

val df = spark.read.option("multiline", value = true).schema(schema).json("s3://bucket/path/")

One other thing to try... just skip inferring the schema while reading. I don't know if the following uses driver resources in the same way, but I seem to recall that it may use a small subset of rows.

val df = spark.read.option("multiline", value = true).json("s3://bucket/path/")

val schema = df.schema
schema.printTreeString

df.printSchema

EDIT - in response to comment indicating above was no good

One last thing to try... here I'm just trying to get the driver out of the mix, so I'm doing the following...

  1. reading it in as plain text with JSON records over multiple lines
  2. using .mapPartitions to iterate over each partition and merge JSON that is split over multiple lines into 1 record per JSON string
  3. finally... parse into JSON using your favorite parser (I use json4s for no particular reason)

If after this, you still run into memory errors, it should be on the executors, where you have more options.

Of course if you're looking for Spark to automatically just read it into a 200 column dataframe, maybe you are just going to need a bigger driver.

So here's the function to iterate over lines of text and trying to merge into single records on each line. This works for the toy example, but you'll likely have to do something smarter.

.mapPartitions treats each partition as an iterator... so you need to give it a function of type Iterator[A] => Iterator[B] that in this case is just a .foldLeft that uses regex to figure out if it's the end of the record.

import org.apache.spark.rdd.RDD // RDD because that's what I use; probably similar on dataframes
import org.json4s._             // json4s for no particular reason
import org.json4s.jackson.JsonMethods._

/** `RDD.mapPartitions` treats each partition as an iterator
  * so use a .foldLeft on the partition and a little regex
  * merge multiple lines into one
  * 
  * probably need something a smarter for more nested JSON
  */
val mergeJsonRecords: (Iterator[String] => Iterator[String]) = (oneRawPartition) => {
  // val patternStart = "^\\[?\\{".r
  val patternEnd = "(.*?\\})[,\\]]?$".r // end of JSON record
  oneRawPartition
    .foldLeft(List[String]())((list, next) => list match {
      case Nil => List(next.trim.drop(1))
      case x :: Nil => { 
        x.trim match {
          case patternEnd(e) => List(next.trim, e)
          case _ => List(x + next.trim)
        }
      }
      case x :: xs => {
        x.trim match {
          case patternEnd(e) => next.trim :: e :: xs
          case _ => x.trim + next.trim :: xs
        }
      }
    })
    .map { case patternEnd(e) => e; case x => x } // lame way to clean up last JSON in each partitions
    .iterator
}

Here just read the data it... merge lines... then parse. Again, I'm using an RDD because it's what I usually use, but I'm sure you can keep it in a dataframe if you need to.

// read JSON in as plain text; each JSON record over multiple lines
val rdd: RDD[String] = spark.read.text("s3://bucket/path/").rdd.map(_.getAs[String](0))
rdd.count // 56 rows == 8 records

// one record per JSON object
val rdd2: RDD[String] = rdd.mapPartitions(mergeJsonRecords)
rdd2.collect.foreach(println)
rdd2.count // 8

// parsed JSON
object Parser extends Serializable {
  implicit val formats = DefaultFormats
  val func: (String => JValue) = (s) => parse(s)
}
val rddJson = rdd2.map(Parser.func)
Quintuplicate answered 27/1, 2023 at 1:40 Comment(4)
Thanks for the insights about the inferSchema. The problem is that the source data has a lot of columns (like 200+) and it is very nested. Not only that, the schema changes periodically without the data team being informed about any changes. So defining the schema upfront is difficult to not possible I'm afraid. I tried the the 'printTreeString' solution but this also overfloods the driver.Weinrich
You could use the top approach... one time, load a single file instead of a whole directory, let the driver infer the schema and save it as a variable (val schema = df.schema). Then load the whole directory while providing the saved schema inferred from the single file.Quintuplicate
One last new attempt... after the EDIT I put up an attempt that reads it in as raw text and you manually merge into single JSON records. Here you're going to then have to manually manipulate it rather than have the driver infer the schema and dump it into a 200 column data frame for you, so maybe not what you're looking for.Quintuplicate
Loading in a single file and taking that schema would work I suppose.. But the problem is that it's not guaranteed that the schema of one file is the same as the second file. The number of fields can differ for each json object. I will investigate your second edit.Weinrich
F
1

I encountered the exact same issue and have been working on it for the last few days. After numerous trial-and-error methods, I realized that the root cause is the infer schema issue.

Here are some key points to note:

  1. Since the JSON structure is highly dynamic, the driver attempts to infer the schema for every individual object, which overwhelms it. This leads to driver OOM (Out of Memory), even though no actions are performed on the Data-frame.
  2. The error message doesn't explicitly point to the root cause, making it difficult to identify it as an infer schema issue.

Resolution: My JSON object is very complex, both in terms of the number of columns per layer and the number of nested layers. Therefore, it's more efficient to define the schema as much as possible upfront. To address the issue, I defined a schema for only the first layer of the object and treated all nested data within it as strings, as shown below:

schema = StructType([
    StructField("Item", StructType([
        StructField("column1", StructType([ StructField("S", StringType(), True) ]), True),
        StructField("column2", StructType([ StructField("N", StringType(), True) ]), True),
        StructField("column5", StructType([ StructField("M", StringType(), True) ]), True)
    ]), True)
])

df = spark.read.json(input_path, schema=schema)
df.printSchema()

This worked for me.

Flub answered 23/9 at 21:25 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.