I have a Spark Streaming application that reads data from multiple Kafka topics. Each topic has a different type of data, and thus requires a different processing pipeline.
My initial solution was to create one DStream per topic:
def main(args: Array[String]) {
val streamingContext: StreamingContext = ...
val topics = ...
for (topic <- topics) {
val offsets: Map[TopicAndPartition, Long] = ...
val stream = KafkaUtils.createDirectStream[...](streamingContext, kafkaProperties, offsets, ...)
configureStream(topic, stream)
}
streamingContext.addStreamingListener(new StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
// logic to save offsets after each batch completes
}
})
streamingContext.start()
streamingContext.awaitTermination()
}
def configureStream(topic: String, stream: DStream[...]) {
topic match {
case "first" => stream.map(...).foreachRDD(...)
case "second" => stream.map(...).foreachRDD(...)
case "third" => stream.map(...).foreachRDD(...)
// ...
}
}
When running the application, the processing jobs are computed one after another, even though they belong to different DStreams initially..
I tried adjusting spark.streaming.concurrentJobs
parameter (as stated here), but that's when things got weird:
- The first batch was processing more data (intended because data accumulates in Kafka while the streaming application is down). The processing time was longer than the assigned batch interval.
- The second batch was added to the queue (first batch still running), and started processing immediately.
- The second (and sometimes even the third) batch had finished before the first batch did.
This can cause problems, for example when managing Kafka offsets - the streaming listener gets the offsets of the second/third batch first (because it finishes first) and saves them. If the application crashes before finishing the first batch, that data is lost. In another case, if the first batch does finish and the application crashes afterwards, the data from the second/third batch gets replayed.
Is there a way to tell Spark to process the jobs in parallel without processing new batches? Or, perhaps, process different DStreams in parallel (i.e., jobs within one DStream are processed linearly; across different DStreams in parallel)?