ConcurrentModificationException when using Spark collectionAccumulator
Asked Answered
H

2

10

I'm trying to run a Spark-based application on an Azure HDInsight on-demand cluster, and am seeing lots of SparkExceptions (caused by ConcurrentModificationExceptions) being logged. The application runs without these errors when I start a local Spark instance.

I've seen reports of similar errors when using accumulators and my code is indeed using a CollectionAccumulator, however I have placed synchronized blocks everywhere I use it, and it makes no difference. The accumulator-related code looks like this:

class MySparkClass(sc : SparkContext) {
    val myAccumulator = sc.collectionAccumulator[MyRecord]

    override def add(record: MyRecord) = {
        synchronized {
            myAccumulator.add(record)
        }
    }

    override def endOfBatch() = {
        synchronized {
            myAccumulator.value.asScala.foreach((record: MyRecord) => {
                processIt(record)
            })
        }
    }
}

The exceptions don't cause the application to fail, however when endOfBatch is called and the code tries to read values out of the accumulator it is empty and processIt is never called.

We are using HDInsight version 3.6 with Spark version 2.3.0

18/11/26 11:04:37 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
    at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.ConcurrentModificationException
    at java.util.ArrayList.writeObject(ArrayList.java:770)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
    at java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:565)
    at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:231)
    at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
    ... 13 more

The following code is a more self-contained example that reproduces the problem. MyRecord is a simple case class containing only numeric values. The code runs without error locally, but on an HDInsight cluster it produces the error above.

object MainDemo {
    def main(args: Array[String]) {
        val sparkContext = SparkSession.builder.master("local[4]").getOrCreate().sparkContext
        val myAccumulator = sparkContext.collectionAccumulator[MyRecord]

        sparkContext.binaryFiles("/my/files/here").foreach(_ => {
            for(i <- 1 to 100000) {
                val record = MyRecord(i, 0, 0)
                myAccumulator.add(record)
            }
        })

        myAccumulator.value.asScala.foreach((record: MyRecord) => {
            // we expect this to be called once for each record that we 'add' above,
            // but it is never called
            println(record)
        })
    }
}
Hukill answered 26/11, 2018 at 15:53 Comment(2)
This might sound like a stupid suggestion but I'm not entire convinced that this error comes from your accumulator. So there are two tests I'd like to suggest: 1) make a fake run of your job with MySparkClass modified in such a way that all the (fake) data is pre-added to your accumulator at the constructor and then it is never modified by add. 2) make a test run with a code modified to use copy-on-write logic: make myAccumulator var instead of val and used copy-add-assign cycle instead of just add. This is 100% thread safe but very slow. I bet errors will be still there.Rolanderolando
link to bug report raised on Apache site: issues.apache.org/jira/browse/SPARK-26183Hukill
C
1

I doubt if having synchronized block really helps. CustomeAccumulators or all other accumulator are not thread-safe. They do not really have to since the DAGScheduler.updateAccumulators method that the spark driver uses to update the values of accumulators after a task completes (successfully or with a failure) is only executed on a single thread that runs scheduling loop. Besides that, they are write-only data structures for workers that have their own local accumulator reference whereas accessing the value of an accumulator is only allowed by the driver. And when you say that it works in local mode because it is single JVM but in cluster mode, they are different JVM and java instance, PRC calls are being triggered to enable the communication.

How your MyRecord object looks like and if you just end your line with .value rather having an iterator over it will help. Just try.

myAccumulator.value
Carpentry answered 29/11, 2018 at 13:33 Comment(5)
The add method is called from Executor code and endOfBatch is called from driver code. I'm not sure what you are asking re local mode, but when I run the app on my local PC it works, using master url of local[4] - if you need further info just ask.Hukill
I need to iterate over the values that have been added to the accumulator, if I make the edit you suggest then how will processIt get called? The MyRecord class is a simple case class with various numeric and boolean fieldsHukill
Can you show us the code that uses your class (or at least a minimal example that reproduces the error)?Biconcave
@Biconcave I've added an example to the questionHukill
That's strange... I managed to run your code locally as well and got the expected behavior. Out of curiosity, why are you using an accumulator to collect that much data to the driver?Biconcave
G
-1

It makes sense to read the accumulator only after some action on the RDD has been called (collect or count).

Also you don't need to synchronize on the accumulator since an independent copy of it will be allocated per partition.

Gummite answered 30/11, 2018 at 11:46 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.