Task Not Serializable exception when trying to write a rdd of type Generic Record
Asked Answered
N

2

1
val file = File.createTempFile("temp", ".avro")
val schema = new Schema.Parser().parse(st)
val datumWriter = new GenericDatumWriter[GenericData.Record](schema)
val dataFileWriter = new DataFileWriter[GenericData.Record](datumWriter)
dataFileWriter.create(schema , file)
rdd.foreach(r => {
  dataFileWriter.append(r)
})
dataFileWriter.close()

I have a DStream of type GenericData.Record which I am trying to write to HDFS in the Avro format but I'm getting this Task Not Serializable error:

org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2062)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
at KafkaCo$$anonfun$main$3.apply(KafkaCo.scala:217)
at KafkaCo$$anonfun$main$3.apply(KafkaCo.scala:210)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.avro.file.DataFileWriter
Serialization stack:
- object not serializable (class: org.apache.avro.file.DataFileWriter, value: org.apache.avro.file.DataFileWriter@78f132d9)
- field (class: KafkaCo$$anonfun$main$3$$anonfun$apply$1, name: dataFileWriter$1, type: class org.apache.avro.file.DataFileWriter)
- object (class KafkaCo$$anonfun$main$3$$anonfun$apply$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
Neoterism answered 13/6, 2017 at 14:47 Comment(1)
what are you trying to achieve by writing the RDD object into an Avro file? You should take a loot at github.com/databricks/spark-avro which lets you save a DataFrame directly into Avro-formatted files using something like df.write.avro("/tmp/output")Dote
B
2

The key point here is that the DataFileWriter is a local resource (bound to a local file), so serializing it does not make sense.

Adapting the code to do things like mapPartitions will not help either, as such executor-bound approach will write files on the local filesystem of the executors.

We need to use an implementation that supports the distributed nature of Spark, for example, https://github.com/databricks/spark-avro

Using that library:

Given some schema represented by a case class, we would do:

val structuredRDD = rdd.map(record => recordToSchema(record))
val df = structuredRDD.toDF()
df.write.avro(hdfs_path)
Britanybritches answered 13/6, 2017 at 15:35 Comment(6)
what's recordToSchemaNeoterism
a function that you write to convert the record format to your case class.Britanybritches
any example can you provide as I haven't done this beforeNeoterism
@Neoterism well, I see that GenericData.Record is already an AVRO format. Maybe there are smarter ways to directly save it as AVRO. - I don't know.Britanybritches
Does this work for streaming as well? The question metions DStreams.Infidelity
@Infidelity sure, as long as the path is different on each streaming interval, like appending timestamp for example. Although this approach should work, I would think that there's an easier way to go from a GenericData.Record to an AVRO file.Britanybritches
I
1

Since lambdas have to be distributed around the cluster for running, they have to only reference serializable data so that they can be serialized, shipped to different executors for deployment and executed there as tasks.

What you could probably do is:

  • create a new file and obtain a handle to it
  • use the mapPartitions (instead of map) method and create a new writer for each partition
  • use the file handle with the writer you create for each partition to append each message within the partition to that file
  • ensure the file handle is closed when the stream is fully consumed
Infidelity answered 13/6, 2017 at 15:10 Comment(3)
Here where am I using map , and a psuedo code would be much helpfulNeoterism
Although valid advice in general, I don't think that mapPartitions and local objects will help in this case. Note how this dataFileWriter is bound to a local file: dataFileWriter.create(schema , file), Using mapPartitions we would create a local file on each executor.Britanybritches
That's why I suggested passing around a file handler to be closed when the stream is fully consumed and not the current mode of operating. Today I'll hopefully have time to edit my answer with some code.Infidelity

© 2022 - 2024 — McMap. All rights reserved.