Spark + Json4s serialization problems
Asked Answered
E

2

4

I am using Json4s classes inside of a Spark 2.2.0 closure. The "workaround" for a failure to serialize DefaultFormats is to include their definition inside every closure executed by Spark that needs them. I believe I have done more than I needed to below but still get the serialization failure.

Using Spark 2.2.0, Scala 2.11, Json4s 3.2.x (whatever is in Spark) and also tried using Json4s 3.5.3 by pulling it into my job using sbt. In all cases I used the workaround shown below.

Does anyone know what I'm doing wrong?

logger.info(s"Creating an RDD for $actionName")
implicit val formats = DefaultFormats
val itemProps = df.rdd.map[(ItemID, ItemProps)](row => { <--- error points to this line
  implicit val formats = DefaultFormats
  val itemId = row.getString(0)
  val correlators = row.getSeq[String](1).toList
  (itemId, Map(actionName -> JArray(correlators.map { t =>
    implicit val formats = DefaultFormats
    JsonAST.JString(t)
  })))
})

I have also tried another suggestion, which is to set the DefaultFormats implicit in the class constructor area and not in the closure, no luck anywhere.

The JVM error trace is from Spark complaining that the task is not serializable and pointing to the line above (last line in my code anyway) then the root cause is explained with:

Serialization stack:
- object not serializable (class: org.json4s.DefaultFormats$, value: org.json4s.DefaultFormats$@7fdd29f3)
- field (class: com.actionml.URAlgorithm, name: formats, type: class org.json4s.DefaultFormats$)
- object (class com.actionml.URAlgorithm, com.actionml.URAlgorithm@2dbfa972)
- field (class: com.actionml.URAlgorithm$$anonfun$udfLLR$1, name: $outer, type: class com.actionml.URAlgorithm)
- object (class com.actionml.URAlgorithm$$anonfun$udfLLR$1, <function3>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$4, name: func$4, type: interface scala.Function3)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$4, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(input[2, bigint, false], input[3, bigint, false], input[5, bigint, false]))
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 3)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 128 more
Ethnology answered 26/1, 2018 at 1:31 Comment(1)
can you resolve it? i think you should create json4s instance in map function. might json instance create in driver node,but execue in executor node.Boren
S
0

I have another example. You can try it by using spark-shell. I hope it can help you.

import org.json4s._
import org.json4s.jackson.JsonMethods._

def getValue(x: String): (Int, String) = {
  implicit val formats: DefaultFormats.type = DefaultFormats
  val obj = parse(x).asInstanceOf[JObject]
  val id = (obj \ "id").extract[Int]
  val name = (obj \ "name").extract[String]
  (id, name)
}

val rdd = sc.parallelize(Array("{\"id\":0, \"name\":\"g\"}", "{\"id\":1, \"name\":\"u\"}", "{\"id\":2, \"name\":\"c\"}", "{\"id\":3, \"name\":\"h\"}", "{\"id\":4, \"name\":\"a\"}", "{\"id\":5, \"name\":\"0\"}"))
rdd.map(x => getValue(x)).collect
Squeak answered 22/7, 2018 at 11:50 Comment(0)
I
0

Interesting. One typical problem is that you run into serialization issues with the implicit val formats, but as you define them inside your loop this should be ok.

I know that this is bit hacky, but you could try the following:

  1. using @transient implicit val
  2. maybe do a minimal test whether JsonAST.JString(t) is serializable
Intercourse answered 8/10, 2018 at 13:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.