How to iterate Flux and mix with Mono
Asked Answered
P

1

8

I have a use case when I should send email to the users. First I create email body.

Mono<String> emailBody = ...cache();

And then I select users and send the email to them:

Flux.fromIterable(userRepository.findAllByRole(Role.USER))
            .map(User::getEmail)
            .doOnNext(email -> sendEmail(email, emailBody.block(), massSendingSubject))
            .subscribe();

What I don't like

  1. Without cache() method emailBody Mono calculates in each iteration step.
  2. To get emailBody value I use emailBody.block() but maybe there's a reactive way and not call block method inside Flux flow?
Priming answered 14/10, 2017 at 11:23 Comment(0)
I
15

There are several issues in this code sample. I'll assume that this is a reactive web application.

First, it's not clear how you are creating the email body; are you fetching things from a database or a remote service? If it is mostly CPU bound (and not I/O), then you don't need to wrap that into a reactive type. Now if it should be wrapper in a Publisher and the email content is the same for all users, using the cache operator is not a bad choice.

Also, Flux.fromIterable(userRepository.findAllByRole(Role.USER)) suggest that you're calling a blocking repository from a reactive context.

You should never do heavy I/O operations in a doOn*** operator. Those are made for logging or light side-effects operations. The fact that you need to .block() on it is another clue that you'll block your whole reactive pipeline.

Last one: you should not call subscribe anywhere in a web application. If this is bound to an HTTP request, you're basically triggering the reactive pipeline with no guarantee about resources or completion. Calling subscribe triggers the pipeline but does not wait until it's complete (this method returns a Disposable).

A more "typical" sample of that would look like:

Flux<User> users = userRepository.findAllByRole(Role.USER);
String emailBody = emailContentGenerator.createEmail();


// sendEmail() should return Mono<Void> to signal when the send operation is done
Mono<Void> sendEmailsOperation = users
     .flatMap(user -> sendEmail(user.getEmail(), emailBody, subject))
     .then();

// something else should subscribe to that reactive type,
// you could plug that as a return value of a Controller for example

If you're somehow stuck with blocking components (the sendEmail one, for example), you should schedule those blocking operations on a specific scheduler to avoid blocking your whole reactive pipeline. For that, look at the Schedulers section on the reactor reference documentation.

Impala answered 16/10, 2017 at 8:6 Comment(5)
Thanks for the answer. It's very useful to understand doOn***. Email body is generated by fetching new items from DB and make some sort/group operation so it's Mono<String>. How would you modify sendEmailsOperation in this case?Priming
sendEmail should probably take a Mono<String> or better, a Flux<DataBuffer> as an argument in that case.Impala
sendEmail - 3d party API which expects Java types. Also you can not just call Mono inside Flux like user.map(u-> sendEmail(emailBodyMono)) because someone should start emailBodyMono flow. I do it by calling block() method.Priming
if you're calling blocking I/O APIs all around, then there's little benefit in using WebFlux. In the end, most of your code will need to be scheduled on special schedulers.Impala
I understand it but we have much legacy Java blocking libraries. And I'm trying to mix reactive code with old as it's better then use old code everywhere.Priming

© 2022 - 2024 — McMap. All rights reserved.