Currently Spring does not support Reactive Kafka directly or supports but not fully. You should use Reactive Kafka
Spring does not have annotation for Reactive Kafka like @ReactiveKafkaListener
You can use KafkaReceiver/KafkaSender classes for publishing/consuming messages
Use this dependency
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
@Bean
public <K,V> KafkaSender<K,V> kafkaSender(ObservationRegistry registry) {
var properties = kafkaProperties.buildProducerProperties(new DefaultSslBundleRegistry());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.collectionToCommaDelimitedString(kafkaProperties.getBootstrapServers()));
var senderOptions = SenderOptions.<K, V>create(properties)
.withObservation(registry, new KafkaSenderObservation.DefaultKafkaSenderObservationConvention());
return KafkaSender.create(senderOptions);
}
@Bean
public <K,V> KafkaReceiver<K,V> kafkaReceiver(ObservationRegistry registry) {
var properties = kafkaProperties.buildConsumerProperties(new DefaultSslBundleRegistry());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.collectionToCommaDelimitedString(kafkaProperties.getBootstrapServers()));
var receiverOptions = ReceiverOptions.<K, V>create(properties)
.withObservation(registry, new KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention())
.subscription(topics);
return KafkaReceiver.create(receiverOptions);
}
KafkaSender.send(outbound.map(r -> senderRecord(r)))
.doOnNext(result -> processResponse(result))
.doOnError(e -> processError(e));
Flux<ReceiverRecord<Integer, String>> inboundFlux =
KafkaReceiver.receive();
This will not work in raw state, that's why you should subscribe once when your application is ready.
You should use @PostConstruct
, @EventListener
or some post annotations or classes like CommandLineRunner
, ApplicationEvent
for consuming messages.
@PostConstruct
public void init() {
consume()
.onErrorContinue((throwable, o) -> log.error("Error while initializing Kafka consumer.", throwable))
.subscribe();
}
public Flux<Void> consume() {
return KafkaReceiver
.receive();
}
But one thing, you cannot configure kafka from configuration (application.yaml/application.properties)
If you want to take more control, you can use another dependency.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-reactive</artifactId>
</dependency>
This dependency includes
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
This also includes Spring boot Kafka and this helps you configure Kafka from configuration.
But instead of using KafkaSender/KafkaProducer, you can use ReactiveKafkaProducerTemplate/ReactiveKafkaConsumerTemplate
ReactiveKafkaConsumerTemplate
in Spring for Apache Kafka project: github.com/spring-projects/spring-kafka/blob/master/… ? – Hesperus@KafkaListener
is on the road map for next year. Right now, all we have is the lightweight wrapper that Artem mentioned. That said, managing partition offsets for a reactive (or any async) consumer is particularly difficult. – Earvin@kafkaListener
arrived yet? I am unable to find the same. – Breadwinner