Spark Streaming: Broadcast variables, java.lang.ClassCastException
Asked Answered
A

1

7


I try to read data from a static textfile stored in HDFS, store its content into an ArrayBuffer which in turn should be broadcasted via sparkContext.broadcast as a BroadcastVariable. I am using cloudera's spark, spark version 1.6.0-cdh5.7.0 and spark-streaming_2.10.

I start the application on yarn using spark-submit:

spark-submit --class my.package.BroadcastStreamTest1 --master yarn --deploy-mode client --conf spark.executor.userClassPathFirst=true current.jar

When I do this, I get an java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator The same code used with a hard coded ArrayBuffer work perfectly so I assume it has something to do with the static file resource... Does anyone have an idea what I am possibly doing wrong? Any help appreciated.

This does not work:


    object BroadcastStreamTest1 {

        def main(args: Array[String]) {
            val sparkConf = new SparkConf()
            val streamingContext = new StreamingContext(sparkConf, batchDuration = Seconds(10))

            val content = streamingContext.sparkContext
                .textFile("hdfs:///data/someTextFile.txt")
                .collect()
                .toBuffer[String]

            val broadCastVar = streamingContext.sparkContext.broadcast(content)
            broadCastVar.value.foreach(line => println(line))

            streamingContext.start()
            streamingContext.awaitTermination()
        }
    }

This works:


    object BroadcastStreamTest2 {

        def main(args: Array[String]) {
            val sparkConf = new SparkConf()
            val streamingContext = new StreamingContext(sparkConf, batchDuration = Seconds(10))

            val content = new mutable.ArrayBuffer[String]
            (1 to 50).foreach(i => content += "line" + i)

            val broadCastVar = streamingContext.sparkContext.broadcast(content)
            broadCastVar.value.foreach(line => println(line))

            streamingContext.start()
            streamingContext.awaitTermination()
        }
    }

Stacktrace:

16/04/25 10:09:59 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, n525.hadoop.mxint.net): java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1208)
        at org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
        at org.apache.spark.Accumulable$$anonfun$readObject$1.apply$mcV$sp(Accumulators.scala:152)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205)
        ... 30 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1940)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
        at net.meetrics.dada.streaming.application.BroadcastStreamTest1$.main(BroadcastStreamTest1.scala:14)
        at net.meetrics.dada.streaming.application.BroadcastStreamTest1.main(BroadcastStreamTest1.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1208)
        at org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
        at org.apache.spark.Accumulable$$anonfun$readObject$1.apply$mcV$sp(Accumulators.scala:152)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205)
        ... 30 more 
Archiplasm answered 25/4, 2016 at 9:36 Comment(3)
Hmm... I'm thinking that scala.collection.mutable.Buffer[A] (the type of your content variable in the non-working example) is not the same as scala.collection.mutable.ArrayBuffer[A] (the type of your content variable in your working example) and I'm thinking that if you expect your broadcasted variable to be of type ArrayBuffer[A], you will definitely get a ClassCastException when you pass it a Buffer[A].Pentagram
That does not seem to be the problem. I did not declare my broadcast variable to expect a specific type. I tried it also with toList and by creating a new ArrayBuffer and inserting all elements from the collected array. I think may have seomting to do with the static file ressouce but I am not sure, it also works in the spark shell using the provided sparkContext.Archiplasm
I figured out that the problem has to do with setting spark.executor.userClassPathFirst=true, maybe some version problem... I'll answer my question when I found out the reason,Archiplasm
A
2

The reason was some kind of conflict with my provided jar file.

Without setting

spark.executor.userClassPathFirst=true

it works, unfortunately I could not locate the exact cause of the problem.

Archiplasm answered 26/4, 2016 at 11:13 Comment(1)
I don't use this conf, but receive the same error. I'm still trying to found correct answer, but seems that the Scala version between your code and Spark in use is incompatible.Primo

© 2022 - 2024 — McMap. All rights reserved.