Why does Spark throw "SparkException: DStream has not been initialized" when restoring from checkpoint?
Asked Answered
A

3

7

I am restoring a stream from a HDFS checkpoint (ConstantInputDSTream for example) but I keep getting SparkException: <X> has not been initialized.

Is there something specific I need to do when restoring from checkpointing?

I can see that it wants DStream.zeroTime set but when the stream is restored zeroTime is null. It doesn't get restored possibly due to it being a private member IDK. I can see that the StreamingContext referenced by the restored stream does have a value for zeroTime.

initialize is a private method and gets called at StreamingContext.graph.start but not by StreamingContext.graph.restart, presumably because it expects zeroTime to have been persisted.

Does someone have an example of a Stream that recovers from a checkpoint and has a non null value for zeroTime?

def createStreamingContext(): StreamingContext = {
    val ssc = new StreamingContext(sparkConf, Duration(1000))
    ssc.checkpoint(checkpointDir)
    ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)

val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)
Antonia answered 29/1, 2016 at 17:9 Comment(0)
A
14

The problem was that I created the dstreams after the StreamingContext had been recreated from checkpoint, i.e. after StreamingContext.getOrCreate. Creating dstreams and all transformations should've been in createStreamingContext.

The issue was filled as [SPARK-13316] "SparkException: DStream has not been initialized" when restoring StreamingContext from checkpoint and the dstream is created afterwards.

Antonia answered 5/2, 2016 at 11:27 Comment(2)
can you give some example of how you got away from problem ? I am unable to get ReceiverStreamObject from checkpointed StreamingContextLombardy
Can u give corrected code ? so that we can understand the fixArdussi
M
1

This Exception may also occur when you are trying to use same check-pointing directory for 2 different spark streaming jobs. In that case also you will get this exception.

Try using unique checkpoint directory for each spark job.

Mockingbird answered 24/11, 2017 at 7:33 Comment(0)
C
0

ERROR StreamingContext: Error starting the context, marking it as stopped org.apache.spark.SparkException: org.apache.spark.streaming.dstream.FlatMappedDStream@6c17c0f8 has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289)


The above error was due to the fact that I also had another Spark Job writing to the same checkpointdir. Even though the other spark job was not running, the fact that it had written to the checkpointdir, the new Spark Job was not able to configure the StreamingContext.

I deleted the contents of the checkpointdir and resubmitted the Spark Job, and the issue was resolved.

Alternatively, you can just use a separate checkpointdir for each Spark Job, to keep it simple.

Centurial answered 11/5, 2018 at 0:32 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.