Force Confluent s3 sink to flush
Asked Answered
P

1

5

I setup kafka connect s3 sink, duration set to 1 hour, and also I setup a rather big flush count, say 10,000. Now if there is not many message in the kafka channel, s3 sink will try to buffer them in memory, and wait it to accumulate to the flush count, then upload them together and commit the offset to its own consumer group.

But think of this situation. If in the channel, I only send 5,000 messages. Then there is no s3 sink flush. Then after a long time, the 5,000 message will eventually be evicted from kafka because of the retention time. But these messages are still in s3 sink's memory, not in s3. This is very dangerous, for example, if we restarted s3 sink or the machine running s3 sink just crashes. Then we lost those 5,000 messages. We cannot find them again from kafka because it is already deleted.

Will this happen to s3 sink? Or there is some settings that force it to flush after sometime?

Pereyra answered 8/6, 2018 at 13:42 Comment(2)
Look at the Scheduled rotation ms propertySybilsybila
Thanks, I checked, that is it.Pereyra
D
7

If your stream from Kafka to S3 does not have constant flow of records, you may use the property

rotate.schedule.interval.ms

to flush records in scheduled intervals.

Note, that in the case of reprocessing your downstream system should be able to cope with duplicates if this option is used. That is because flushing such records based on wall-clock might result in duplicates appearing in different files if the connector is scheduled to re-export records from Kafka.

As a sidenote, if you use property:

rotate.interval.ms

with the Wallclock timestamp extractor (timestamp.extractor=Wallclock), your records will be flushed without setting the rotate.schedule.interval.ms. But this means that your partitioner is dependent on wall-clock and therefore you should be able to account for duplicate records.

The connector is able to offer exactly-once delivery on a constant stream of records with deterministic partitioners and has various timestamp extractors, such as one that depends on the record's timestamp (Record) or of a field timestamp (RecordField) .

Configuration properties for partitioning here

Dagon answered 13/6, 2018 at 15:5 Comment(2)
Why would this happen: if the connector is scheduled to re-export records from Kafka. I'd like to flush to S3 on a time interval, I don't see how to avoid duplicate records.Cultivation
I found this on the docs docs.confluent.io/kafka-connect-s3-sink/current/… - To guarantee exactly-once semantics with the TimeBasedPartitioner, the connector must be configured to use a deterministic implementation of TimestampExtractor and a deterministic rotation strategy. The deterministic rotation strategy configuration is rotate.interval.ms (setting rotate.schedule.interval.ms is nondeterministic and will invalidate exactly-once guarantees).Mansized

© 2022 - 2024 — McMap. All rights reserved.