If you are using reactive Kafka consumer, you may apply Flux#delaySequence(Duration) to the Kafka consumer get a small delay for each message.
Shift this Flux forward in time by a given Duration. Elements are shifted forward in time as they are emitted, always resulting in the delay between two elements being the same as in the source (only the first element is visibly delayed from the previous event, that is the subscription). Signals are delayed and continue on the parallel Scheduler, but empty sequences or immediate error signals are not delayed.
With this operator, a source emitting at 10Hz with a delaySequence Duration of 1s will still emit at 10Hz, with an initial "hiccup" of 1s. .
Your code might look something like this:
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
public class KafkaService<EVENT> {
private static final Duration KAFKA_CONSUMER_DELAY = Duration.ofSeconds(1);
@Autowired
private ReactiveKafkaConsumerTemplate<UUID, EVENT> consumerTemplate;
@EventListener(value = ApplicationReadyEvent.class)
public void listen() {
consumerTemplate
.receiveAutoAck()
.delaySequence(KAFKA_CONSUMER_DELAY)
.concatMap(consumerRecord -> handleEvent(consumerRecord.key(), consumerRecord.value())
// onErrorResume is important so the consumer doesn't stop consuming on error
.onErrorResume(throwable -> {
log.error("Error during processing event {} [{}]", consumerRecord.value().getClass().getSimpleName(), consumerRecord, throwable);
return Mono.empty();
}
)
)
.subscribe();
}
private Mono<EVENT> handleEvent(UUID key, EVENT event) {
....
}
}
Kafka
it goes into the Kafka logs and becomes available for Consumer to consume it. I do not see any property in Kafka Documentation which gives you an option to delay the consumption with respect to every message (I would be happy to know more about it if there is one). However you have control over consumption of messages so you can start your consumer a bit late as compared to producer. – Openminded