Spark structured streaming exactly once - Not achieved - Duplicated events
C

1

6

I am using Spark Structured Streaming to consume events from Kafka and upload them to S3.

Checkpoints are committed on S3:

DataFrameWriter<Row> writer = input.writeStream()
           .format("orc")
           .trigger(ProcessingTime(config.getProcessingTime()))
           .outputMode(OutputMode.Append())
           .option("truncate", false)           
           .option("checkpointLocation", "s3://bucket1")
           .option("compression", "zlib")
           .option("path", "s3://bucket2");

The offsets are committed to Kafka via StreamingQueryListener :

  kafkaConsumer.commitSync(topicPartitionMap);

Once the application is started it retrieves the offset map from Kafka and start the stream:

 reader = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
            .option("subscribe", "topic1")
            .option("max.poll.records", 1000)
            .option("failOnDataLoss", false)
            .option("startingOffsets", topicPartitionMap)

I store the topic/partition/offset with the data in the ORC files.

The data contain multiple duplicates of the events with exact topic/partition/offset.

How the stream should be configured to achieve exactly once processing ?

Crain answered 21/3, 2019 at 14:17 Comment(7)
Why write Spark code instead of using an existing service that is used by other companies in Production? github.com/pinterest/secorAggressive
Multiple reasons 1. You need to manage schema in order to write ORC with Secor, 2. We have our own business logic 3. We already using Spark cluster.Crain
You can also check this project : github.com/YotpoLtd/metorikkuCrain
Are you open to considering using a system other than Apache Spark?Halonna
I tested Secor And Kafka connect, those 2 are not suitable for meCrain
Why and how are you committing offsets to Kafka after processing? by default spark structured streaming will store commits and offsets to checkpoints and How are you getting topicPartitionMap here ?Active
You can use StreamingQueryListener, see hereCrain
C
5

Found out that those parameters should be set to true spark.streaming.driver.writeAheadLog.closeFileAfterWrite and spark.streaming.receiver.writeAheadLog.closeFileAfterWrite

Set this to 'true' when you want to use S3 for the metadata WAL

https://spark.apache.org/docs/latest/configuration.html

More details here: https://www.waitingforcode.com/apache-spark-streaming/spark-streaming-configuration/read?fbclid=IwAR17x1AfTLH1pjq1QPkDsQT6DU4hgi7WNeIYUnw25Hvquoj-4yQU10R0GeM

Crain answered 28/3, 2019 at 11:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.