How to make Spring Cloud Stream consumer in Webflux application?
B

1

6

I have a Webflux based microservice that has a simple reactive repository:

    public interface NotificationRepository extends ReactiveMongoRepository<Notification, ObjectId> {
    }

Now I would like to extend this microservice to consume event messages from Kafka. This message/event will be then saved into the database.

For the Kafka listener, I used Spring Cloud Stream. I created some simple Consumer and it works great - I'm able to consume the message and save it into the database.

    @Bean
    public Consumer<KStream<String, Event>> documents(NotificationRepository repository) {
        return input ->
                input.foreach((key, value) -> {
                    LOG.info("Received event, Key: {}, value: {}", key, value);
                    repository.save(initNotification(value)).subscribe();
                });
    }

But is this the correct way to connect Spring Cloud Stream consumer and reactive repository? It doesn't look like it is when I have to call subscribe() in the end.

I read the Spring Cloud Stream documentation (for 3.0.0 release) and they say

Native support for reactive programming - since v3.0.0 we no longer distribute spring-cloud-stream-reactive modules and instead relying on native reactive support provided by spring cloud function. For backward compatibility you can still bring spring-cloud-stream-reactive from previous versions.

and also in this presentation video they mention they have reactive programming support using project reactor. So I guess there is a way I just don't know it. Can you show me how to do it right?

I apologize if this all sounds too stupid but I'm very new to Spring Cloud Stream and reactive programming and haven't found many articles describing this.

Blastogenesis answered 15/12, 2020 at 15:43 Comment(0)
M
5

Just use Flux as consumed type, something like this:

@Bean
public Consumer<Flux<Message<Event>>> documents(NotificationRepository repository) {
    return input ->
            input
             .map(message-> /*map the necessary value like:*/ message.getPayload().getEventValue())
             .concatMap((value) -> repository.save(initNotification(value)))
             .subscribe();
}

If you use Function with empty return type (Function<Flux<Message<Event>>, Mono<Void>>) instead of a Consumer, then framework can automatically subscribe. With Consumer you have to subscribe manually, because the framework has no reference to the stream. But in Consumer case you subscribe not to the repository but the whole stream which is ok.

Maleficence answered 16/12, 2020 at 8:41 Comment(4)
Thank you for reply, this is exactly what I need. But I tried it and unfortunately it didn't work, the message is consumed but never saved into database. Didn't you mean to use flatMap instead of doOnNext?Blastogenesis
sry. concatMap or flatMap what you need, i modify the answerMaleficence
Is there a github repo for this code ? I am trying to find right configuration to get this working with avro record instead of Event object.Corvette
you can generate classes from avro instead of using the record class. you can find here a working example (kafka/rabbit) using avro schema github.com/zlaval/spring-cloud-stream-courseMaleficence

© 2022 - 2024 — McMap. All rights reserved.