Apache kafka - consumer delay option
Asked Answered
A

2

4

I want to start a consumer in Kafka for a particular topic in a small delay. In detail, I want the consumer to start consuming the messages from the topic after a particular time delay from the time of producing the messages.

Is there any property or option in Kafka to enable it?

Adigun answered 10/10, 2017 at 11:45 Comment(1)
As soon as the producer sends the message in Kafka it goes into the Kafka logs and becomes available for Consumer to consume it. I do not see any property in Kafka Documentation which gives you an option to delay the consumption with respect to every message (I would be happy to know more about it if there is one). However you have control over consumption of messages so you can start your consumer a bit late as compared to producer.Openminded
F
0

We did the same stuff for spark-streaming. I hope, the approach can suits you also.

The idea is very simple - use Thread.sleep. When you receive new message from kafka, you can calculate how long do you need to sleep before process it.

pseudocode for idea:

message = getNextMessageFromKafka()
sleepMs = Math.max(0, currentTime - message.timestamp)
Thread.sleep(speepMs)
do processing
Firmin answered 10/10, 2017 at 17:53 Comment(2)
The idea is perfectly suiting the problem I feel like what if for every 1 ms interval the topic is enriched with some data and the consumer consumes and the thread sleeps for every 1ms data so this might go on right and the other processing logic will not be called ryte !!!!Adigun
This approach is good maybe for a class example, but not for a production application.Stav
S
0

If you are using reactive Kafka consumer, you may apply Flux#delaySequence(Duration) to the Kafka consumer get a small delay for each message.

Shift this Flux forward in time by a given Duration. Elements are shifted forward in time as they are emitted, always resulting in the delay between two elements being the same as in the source (only the first element is visibly delayed from the previous event, that is the subscription). Signals are delayed and continue on the parallel Scheduler, but empty sequences or immediate error signals are not delayed.

With this operator, a source emitting at 10Hz with a delaySequence Duration of 1s will still emit at 10Hz, with an initial "hiccup" of 1s. .

enter image description here

Your code might look something like this:

import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;

public class KafkaService<EVENT> {
    private static final Duration KAFKA_CONSUMER_DELAY = Duration.ofSeconds(1);

    @Autowired
    private ReactiveKafkaConsumerTemplate<UUID, EVENT> consumerTemplate;

    @EventListener(value = ApplicationReadyEvent.class)
    public void listen() {
        consumerTemplate
            .receiveAutoAck()

            .delaySequence(KAFKA_CONSUMER_DELAY)

            .concatMap(consumerRecord -> handleEvent(consumerRecord.key(), consumerRecord.value())
                // onErrorResume is important so the consumer doesn't stop consuming on error
                .onErrorResume(throwable -> {
                    log.error("Error during processing event {} [{}]", consumerRecord.value().getClass().getSimpleName(), consumerRecord, throwable);
                    return Mono.empty();
                }
            )
        )
        .subscribe();
    }

    private Mono<EVENT> handleEvent(UUID key, EVENT event) {
        ....
    }
}
Stav answered 15/8, 2023 at 9:41 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.