Running multiple Spark Streaming jobs of different DStreams in parallel
Asked Answered
L

1

10

I have a Spark Streaming application that reads data from multiple Kafka topics. Each topic has a different type of data, and thus requires a different processing pipeline.

My initial solution was to create one DStream per topic:

def main(args: Array[String]) { 
    val streamingContext: StreamingContext = ...
    val topics = ...

    for (topic <- topics) {
        val offsets: Map[TopicAndPartition, Long] = ...
        val stream = KafkaUtils.createDirectStream[...](streamingContext, kafkaProperties, offsets, ...)
        configureStream(topic, stream)
    }

    streamingContext.addStreamingListener(new StreamingListener {
        override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
            // logic to save offsets after each batch completes
        }
    })

    streamingContext.start()
    streamingContext.awaitTermination()
}


def configureStream(topic: String, stream: DStream[...]) {
    topic match {
        case "first" => stream.map(...).foreachRDD(...)
        case "second" => stream.map(...).foreachRDD(...)
        case "third" => stream.map(...).foreachRDD(...)
        // ...
    }
}

When running the application, the processing jobs are computed one after another, even though they belong to different DStreams initially..

I tried adjusting spark.streaming.concurrentJobs parameter (as stated here), but that's when things got weird:

  • The first batch was processing more data (intended because data accumulates in Kafka while the streaming application is down). The processing time was longer than the assigned batch interval.
  • The second batch was added to the queue (first batch still running), and started processing immediately.
  • The second (and sometimes even the third) batch had finished before the first batch did.

This can cause problems, for example when managing Kafka offsets - the streaming listener gets the offsets of the second/third batch first (because it finishes first) and saves them. If the application crashes before finishing the first batch, that data is lost. In another case, if the first batch does finish and the application crashes afterwards, the data from the second/third batch gets replayed.

Is there a way to tell Spark to process the jobs in parallel without processing new batches? Or, perhaps, process different DStreams in parallel (i.e., jobs within one DStream are processed linearly; across different DStreams in parallel)?

Laclos answered 2/4, 2017 at 11:8 Comment(5)
Did you find anything on the subjet ?Algar
Did you find the solution?Hepza
No, I had to change approach. I now have a dedicated streaming application per Kafka topic.Laclos
I feel dedicated steaming per topic is the only option for this. I have also face same problem So I have created threadpool with fixed size and for each topic I have created dedicated thread.Lifesaving
@hayat, did you get some benefits from threadpool. Do you have some sample how exact you did?Amoral
B
0

It is not possible to do with Dstream.

Spark structured streaming solves this very issue.

Can you can checkout this answer for more info.

Billat answered 6/10, 2023 at 5:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.