Implement Reactive Kafka Listener in Spring Boot application
Asked Answered
U

2

10

I'm trying to implement reactive kafka consumer in my Spring boot application and I'm looking at these examples: https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleScenarios.java

and it looks like there is no support for Spring in reactive kafka yet

I understand how kafka listeners work in non-reactive kafka API in Spring: simplest solution is to configure beans for ConcurrentKafkaListenerContainerFactory and ConsumerFactory, then use @KafkaListener annotation and voila

But I'm not sure how to properly use reactive kafka in Spring right now.

Basically I need a listener for topic. Should I create some kind of loop or scheduler of my own? Or maybe I'm missing something. Can anyone share their knowledge and best practices?

Untraveled answered 1/12, 2020 at 19:24 Comment(5)
Did you see a ReactiveKafkaConsumerTemplate in Spring for Apache Kafka project: github.com/spring-projects/spring-kafka/blob/master/… ?Hesperus
Reactive support for @KafkaListener is on the road map for next year. Right now, all we have is the lightweight wrapper that Artem mentioned. That said, managing partition offsets for a reactive (or any async) consumer is particularly difficult.Earvin
@ArtemBilan thanks for the link, will look into thatUntraveled
@GaryRussell has the reactive support for @kafkaListener arrived yet? I am unable to find the same.Breadwinner
No, sorry. Contributions are always welcome.Earvin
G
0

Currently Spring does not support Reactive Kafka directly or supports but not fully. You should use Reactive Kafka

Spring does not have annotation for Reactive Kafka like @ReactiveKafkaListener

You can use KafkaReceiver/KafkaSender classes for publishing/consuming messages

Use this dependency

<dependency>
  <groupId>io.projectreactor.kafka</groupId>
  <artifactId>reactor-kafka</artifactId>
</dependency>

    @Bean
    public <K,V> KafkaSender<K,V> kafkaSender(ObservationRegistry registry) {
        var properties = kafkaProperties.buildProducerProperties(new DefaultSslBundleRegistry());
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.collectionToCommaDelimitedString(kafkaProperties.getBootstrapServers()));
    
        var senderOptions = SenderOptions.<K, V>create(properties)
                .withObservation(registry, new KafkaSenderObservation.DefaultKafkaSenderObservationConvention());
        return KafkaSender.create(senderOptions);
    }

   @Bean
    public <K,V> KafkaReceiver<K,V> kafkaReceiver(ObservationRegistry registry) {
        var properties = kafkaProperties.buildConsumerProperties(new DefaultSslBundleRegistry());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.collectionToCommaDelimitedString(kafkaProperties.getBootstrapServers()));
    
        var receiverOptions = ReceiverOptions.<K, V>create(properties)
                .withObservation(registry, new KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention())
                .subscription(topics);
    
        return KafkaReceiver.create(receiverOptions);
    }
KafkaSender.send(outbound.map(r -> senderRecord(r)))                                        
           .doOnNext(result -> processResponse(result))                                     
           .doOnError(e -> processError(e));

Flux<ReceiverRecord<Integer, String>> inboundFlux =
    KafkaReceiver.receive();

This will not work in raw state, that's why you should subscribe once when your application is ready.

You should use @PostConstruct, @EventListener or some post annotations or classes like CommandLineRunner, ApplicationEvent for consuming messages.

   @PostConstruct
    public void init() {
        consume()
                .onErrorContinue((throwable, o) -> log.error("Error while initializing Kafka consumer.", throwable))
                .subscribe();
    }

   public Flux<Void> consume() {
    return KafkaReceiver
                 .receive();
    }

But one thing, you cannot configure kafka from configuration (application.yaml/application.properties)

If you want to take more control, you can use another dependency.

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka-reactive</artifactId>
    </dependency>

This dependency includes

<dependency>
  <groupId>io.projectreactor.kafka</groupId>
  <artifactId>reactor-kafka</artifactId>
</dependency>

This also includes Spring boot Kafka and this helps you configure Kafka from configuration.

But instead of using KafkaSender/KafkaProducer, you can use ReactiveKafkaProducerTemplate/ReactiveKafkaConsumerTemplate

Goetz answered 20/4, 2024 at 18:26 Comment(0)
G
-1

I don't have a ready solution yet but i'm trying this (Kotlin code, Spring Boot). Someone published part of this code snippet here https://github.com/reactor/reactor-kafka/issues/100

@EventListener(ApplicationStartedEvent::class)
fun onSomeEvent() {
    kafkaReceiver
        .receive()
        .doOnNext { record ->
            val myEvent = record.value()
            processMyEvent(myEvent).thenEmpty {
                record.receiverOffset().acknowledge()
            }
        }
        .doOnError {
            /* todo */
        }
        .subscribe()
}

Look into other stack overflow questions. There is not much there, but maybe will give you some ideas

Gurglet answered 31/12, 2020 at 8:9 Comment(2)
Can you share link to reactor kafka examples or tutorials?Teufert
This will not work. While starting application @EventListener is called, but fails because there will not be ready Kafka connection .Goetz

© 2022 - 2025 — McMap. All rights reserved.