I have a Webflux based microservice that has a simple reactive repository:
public interface NotificationRepository extends ReactiveMongoRepository<Notification, ObjectId> {
}
Now I would like to extend this microservice to consume event messages from Kafka. This message/event will be then saved into the database.
For the Kafka listener, I used Spring Cloud Stream. I created some simple Consumer and it works great - I'm able to consume the message and save it into the database.
@Bean
public Consumer<KStream<String, Event>> documents(NotificationRepository repository) {
return input ->
input.foreach((key, value) -> {
LOG.info("Received event, Key: {}, value: {}", key, value);
repository.save(initNotification(value)).subscribe();
});
}
But is this the correct way to connect Spring Cloud Stream consumer and reactive repository? It doesn't look like it is when I have to call subscribe()
in the end.
I read the Spring Cloud Stream documentation (for 3.0.0 release) and they say
Native support for reactive programming - since v3.0.0 we no longer distribute spring-cloud-stream-reactive modules and instead relying on native reactive support provided by spring cloud function. For backward compatibility you can still bring spring-cloud-stream-reactive from previous versions.
and also in this presentation video they mention they have reactive programming support using project reactor. So I guess there is a way I just don't know it. Can you show me how to do it right?
I apologize if this all sounds too stupid but I'm very new to Spring Cloud Stream and reactive programming and haven't found many articles describing this.