spark streaming checkpoint recovery is very very slow
Asked Answered
P

3

21
  • Goal: Read from Kinesis and store data in to S3 in Parquet format via spark streaming.
  • Situation: Application runs fine initially, running batches of 1hour and the processing time is less than 30 minutes on average. For some reason lets say the application crashes, and we try to restart from checkpoint. The processing now takes forever and does not move forward. We tried to test out the same thing at batch interval of 1 minute, the processing runs fine and takes 1.2 minutes for batch to finish. When we recover from checkpoint it takes about 15 minutes for each batch.
  • Notes: we are using s3 for checkpoints using 1 executor, with 19g mem & 3 cores per executor

Attaching the screenshots:

First Run - Before checkpoint Recovery Before checkpoint - Streaming Page

Before checkpoint - Jobs Page

Before checkpoint - Jobs Page2

Trying to Recover from checkpoint: After checkpoint - Streaming Page After checkpoint - Jobs Page

Config.scala

object Config {

  val sparkConf = new SparkConf


  val sc = new SparkContext(sparkConf)

  val sqlContext = new HiveContext(sc)


  val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
  val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)

  val checkpointDirectory =  sc.hadoopConfiguration.get("checkpointDirectory")

//  sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

  DateTimeZone.setDefault(DateTimeZone.forID("America/Los_Angeles"))

  val numStreams = 2

  def getSparkContext(): SparkContext = {
    this.sc
  }

  def getSqlContext(): HiveContext = {
    this.sqlContext
  }





}

S3Basin.scala

object S3Basin {
  def main(args: Array[String]): Unit = {
    Kinesis.startStreaming(s3basinFunction _)
  }

  def s3basinFunction(streams : DStream[Array[Byte]]): Unit ={
    streams.foreachRDD(jsonRDDRaw =>{
      println(s"Old partitions ${jsonRDDRaw.partitions.length}")
      val jsonRDD = jsonRDDRaw.coalesce(10,true)
      println(s"New partitions ${jsonRDD.partitions.length}")

      if(!jsonRDD.isEmpty()){
        val sqlContext =  SQLContext.getOrCreate(jsonRDD.context)

        sqlContext.read.json(jsonRDD.map(f=>{
          val str = new String(f)
          if(str.startsWith("{\"message\"")){
            str.substring(11,str.indexOf("@version")-2)
          }
          else{
            str
          }
        })).registerTempTable("events")

        sqlContext.sql(
          """
            |select
            |to_date(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_date,
            |hour(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_hour,
            |*
            |from events
          """.stripMargin).coalesce(1).write.mode(SaveMode.Append).partitionBy("event_date", "event_hour","verb").parquet(Config.eventsS3Path)


        sqlContext.dropTempTable("events")
      }
    })
  }
}

Kinesis.scala

object Kinesis{


  def functionToCreateContext(streamFunc: (DStream[Array[Byte]]) => Unit): StreamingContext = {
    val streamingContext = new StreamingContext(Config.sc, Minutes(Config.sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1)))   // new context
    streamingContext.checkpoint(Config.checkpointDirectory)   // set checkpoint directory
    val sc = Config.getSparkContext

    var awsCredentails : BasicAWSCredentials = null
    val kinesisClient = if(Config.useIAMInstanceRole){
      new AmazonKinesisClient()
    }
    else{
      awsCredentails = new BasicAWSCredentials(sc.hadoopConfiguration.get("kinesis.awsAccessKeyId"),sc.hadoopConfiguration.get("kinesis.awsSecretAccessKey"))
      new AmazonKinesisClient(awsCredentails)
    }


    val endpointUrl = sc.hadoopConfiguration.get("kinesis.endpointUrl")
    val appName = sc.hadoopConfiguration.get("kinesis.appName")

    val streamName = sc.hadoopConfiguration.get("kinesis.streamName")

    kinesisClient.setEndpoint(endpointUrl)
    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size

    val batchInterval = Minutes(sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1))

    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval

    // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
    // DynamoDB of the same region as the Kinesis stream
    val regionName = sc.hadoopConfiguration.get("kinesis.regionName")


    val kinesisStreams = (0 until Config.numStreams).map { i =>
        println(s"creating stream for $i")
        if(Config.useIAMInstanceRole){
          KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
            InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)

        }else{
          KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
            InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2,awsCredentails.getAWSAccessKeyId,awsCredentails.getAWSSecretKey)

        }
      }

    val unionStreams = streamingContext.union(kinesisStreams)
    streamFunc(unionStreams)

    streamingContext
  }


  def startStreaming(streamFunc: (DStream[Array[Byte]]) => Unit) = {

    val sc = Config.getSparkContext

    if(sc.defaultParallelism < Config.numStreams+1){
      throw  new Exception(s"Number of shards = ${Config.numStreams} , number of processor = ${sc.defaultParallelism}")
    }

    val streamingContext =  StreamingContext.getOrCreate(Config.checkpointDirectory, () => functionToCreateContext(streamFunc))


//    sys.ShutdownHookThread {
//      println("Gracefully stopping Spark Streaming Application")
//      streamingContext.stop(true, true)
//      println("Application stopped greacefully")
//    }
//

    streamingContext.start()
    streamingContext.awaitTermination()


  }




}

DAG DAG

enter image description here

Petroleum answered 15/7, 2016 at 7:43 Comment(17)
Images are great, but can we see your code? preferably your Spark DAG.Felt
@YuvalItzchakov added code and DAGPetroleum
This is by far the most visually interesting post i've seen on SO. thanks for all the screenshots =DMurtha
@Murtha any lead on where can I look for more information ?Petroleum
Have you tried checkpointing locally within the driver node? if you've ever uploaded a large file to S3 in EC2, you'll know its kinda slow. that may be your bottleneckMurtha
yup tried doing local checkpoint, didn't help. tried updating streaming.blockInterval to 60s to reduce the number of task, that didn't help eitherPetroleum
Does this happen even with a very small amount of data to recover?Bouffard
yup even if I fallback by 5 mins, the 5 batches take 35-40 mins to process. Then the regular batches start working normallyPetroleum
Furthermore: can you tell us what happens at line 19? 5th imageBouffard
you mean isEmpty ? its there as part of S3Basin.scala if(!jsonRDD.isEmpty()){...Petroleum
where are screenshots?Boxwood
@amit_kumar screen shots of ? DAG & execution is there as part of questionPetroleum
@interfector did you find any solution to this one ?Petroleum
@GauravShah, I can only see blank space and no images below First Run - Before checkpoint Recovery Before checkpoint - Streaming PageBoxwood
@amit_kumar I have no problems seeing the images. It may be a local browser problem on your end. Back to the problem, have you just tried to see if your server is swapping? Even with 19G of memory it might be doing something unexpected.Hautrhin
@GauravShah Unfortunately no, we're still looking for alternatives.Hunnicutt
created a jira issue issues.apache.org/jira/browse/SPARK-19304Petroleum
P
4

raised a Jira issue : https://issues.apache.org/jira/browse/SPARK-19304

The issue is because we read more data per iteration than what is required and then discard the data. This can be avoided by adding a limit to getResults aws call.

Fix: https://github.com/apache/spark/pull/16842

Petroleum answered 1/3, 2017 at 8:24 Comment(2)
@interfector you might be interested in this onePetroleum
Awesome work going to the extent of fixing it @gaurav-shah !!Seigneur
B
1

When a failed driver is restart, the following occurs:

  1. Recover computation – The checkpointed information is used to restart the driver, reconstruct the contexts and restart all the receivers.
  2. Recover block metadata – The metadata of all the blocks that will be necessary to continue the processing will be recovered.
  3. Re-generate incomplete jobs – For the batches with processing that has not completed due to the failure, the RDDs and corresponding jobs are regenerated using the recovered block metadata.
  4. Read the block saved in the logs – When those jobs are executed, the block data is read directly from the write ahead logs. This recovers all the necessary data that were reliably saved to the logs.
  5. Resend unacknowledged data – The buffered data that was not saved to the log at the time of failure will be sent again by the source. as it had not been acknowledged by the receiver.

enter image description here Since all these steps are performed at driver your batch of 0 events take so much time. This should happen with the first batch only then things will be normal.

Reference here.

Boxwood answered 13/9, 2016 at 16:38 Comment(10)
correct, we know that it happens only on the batch that needs to be recovered via the checkpoints ( not just first one), and we also discovered that it happens on driver, but how to solve this problem is the questionPetroleum
Checkpointing is really slow, have you tried KryoSerialization. Also consider using Datasets databricks.com/blog/2016/01/04/…, which has faster ser/desr with Encoders.Boxwood
slow is relative, checkpointing needs to serialize data to checkpoininting location which is done by receivers(extra work). After recovery all operations on the data are re-computed, this can involve network traffic as now data source is your checkpointing location. Match your numbers with this comments on SPARK-JIRA issues.apache.org/jira/browse/…Boxwood
we don't need to enable WAL on kinesis from 1.5 spark, it can use Kinesis as source and recover from that. The checkpoint contains only Metadata, which is not expensive to write. No ser/de , no data writing. Only metadata issues.apache.org/jira/browse/SPARK-9215Petroleum
which spark version you are using? plz check if issues.apache.org/jira/browse/… is relevant as your DAG looks similar and in case you are using lower version.Boxwood
thanks @amit_kumar I did try the sam thing on spark 2 but having similar issuePetroleum
I'm running Spark 2.0 and using DataSets - I see the exact same problem.Lumberyard
@GlennieHellesSindholt I think we should create a jira issue for spark, they should be able to helpPetroleum
@GauravShah Did you create a jira issue? I'm also interesting to follow this ticket. I have streaming tab that appear 45 after the restart of the streaming, and after 1h batch take longer and longer leading to a oom. Because I haven't oom when I'm not restarting from the checkpoint I wonder what append inside the hood.Gandhi
I also see HDFSBackedStateStoreProvider: The state for version 71086 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query. on recovery, which can take a while. It makes sense if it needs to recover it's state from a snapshot and some deltas, but it is pretty slow and can put the query in a bad state given the wrong trigger length.Kilter
G
0

I had similar issues before, my application getting slower and slower.

try to release memory after using rdd, call rdd.unpersist() https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#unpersist(boolean)

or spark.streaming.backpressure.enabled to true

http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval

http://spark.apache.org/docs/latest/streaming-programming-guide.html#requirements

also, check your locality setting, maybe too much data move around.

Garrek answered 20/1, 2017 at 7:0 Comment(1)
the application is taking time only in recovery, not in the regular processing. rdd.unpersist will help if I am running out of memory, but that is not the case. Backpressure is useful if I am not able to consume data as fast as it is coming in, but I can actually do it.Petroleum

© 2022 - 2024 — McMap. All rights reserved.