I am using Json4s classes inside of a Spark 2.2.0 closure. The "workaround" for a failure to serialize DefaultFormats
is to include their definition inside every closure executed by Spark that needs them. I believe I have done more than I needed to below but still get the serialization failure.
Using Spark 2.2.0, Scala 2.11, Json4s 3.2.x (whatever is in Spark) and also tried using Json4s 3.5.3 by pulling it into my job using sbt. In all cases I used the workaround shown below.
Does anyone know what I'm doing wrong?
logger.info(s"Creating an RDD for $actionName")
implicit val formats = DefaultFormats
val itemProps = df.rdd.map[(ItemID, ItemProps)](row => { <--- error points to this line
implicit val formats = DefaultFormats
val itemId = row.getString(0)
val correlators = row.getSeq[String](1).toList
(itemId, Map(actionName -> JArray(correlators.map { t =>
implicit val formats = DefaultFormats
JsonAST.JString(t)
})))
})
I have also tried another suggestion, which is to set the DefaultFormats
implicit in the class constructor area and not in the closure, no luck anywhere.
The JVM error trace is from Spark complaining that the task is not serializable and pointing to the line above (last line in my code anyway) then the root cause is explained with:
Serialization stack:
- object not serializable (class: org.json4s.DefaultFormats$, value: org.json4s.DefaultFormats$@7fdd29f3)
- field (class: com.actionml.URAlgorithm, name: formats, type: class org.json4s.DefaultFormats$)
- object (class com.actionml.URAlgorithm, com.actionml.URAlgorithm@2dbfa972)
- field (class: com.actionml.URAlgorithm$$anonfun$udfLLR$1, name: $outer, type: class com.actionml.URAlgorithm)
- object (class com.actionml.URAlgorithm$$anonfun$udfLLR$1, <function3>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$4, name: func$4, type: interface scala.Function3)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$4, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(input[2, bigint, false], input[3, bigint, false], input[5, bigint, false]))
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 3)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 128 more