I am using Spark Structured Streaming to consume events from Kafka and upload them to S3.
Checkpoints are committed on S3:
DataFrameWriter<Row> writer = input.writeStream()
.format("orc")
.trigger(ProcessingTime(config.getProcessingTime()))
.outputMode(OutputMode.Append())
.option("truncate", false)
.option("checkpointLocation", "s3://bucket1")
.option("compression", "zlib")
.option("path", "s3://bucket2");
The offsets are committed to Kafka via StreamingQueryListener
:
kafkaConsumer.commitSync(topicPartitionMap);
Once the application is started it retrieves the offset map from Kafka and start the stream:
reader = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
.option("subscribe", "topic1")
.option("max.poll.records", 1000)
.option("failOnDataLoss", false)
.option("startingOffsets", topicPartitionMap)
I store the topic/partition/offset
with the data in the ORC files.
The data contain multiple duplicates of the events with exact topic/partition/offset
.
How the stream should be configured to achieve exactly once processing ?
StreamingQueryListener
, see here – Crain