Is it possible to start Mono's in parallel and aggregate the result
Asked Answered
P

2

35

I know it is possible to chain Mono's, for ex,...

Mono<String> resultAMono = loadA();
Mono<String> resultBMono = resultA.flatMap(resultA -> loadB());

This will chain and resultBMono will run when resultAMono returns....

So my question is, is it possible to start 2 Mono's in parallel and when both returns continue with another Mono?

I think it will look something like this...

Mono<String> resultAMono = loadA();
Mono<String> resuktBMono = loadB();
Mono<Tuple2<Stirng, String> tupleMono = Mono.zip(resultAMono, resultBMono);

but I have no idea this will run in Parallel or what can I do this to run in parallel...

Thx for answers....

Phototypy answered 9/1, 2018 at 16:27 Comment(0)
S
69

2 semantics, 1 way to make them run in parallel

The two options I present below both need some additional tuning to make A and B Mono run in parallel: namely, each Mono should use subscribeOn(Scheduler) to get out of the common thread from where they're merged.

If you only care about the completion of A and B

Use when to listen for A and B completion and then to continue with a completely different Mono:

Mono.when(monoAwithSubscribeOn, monoBwithSubscribeOn)
    .then(Mono.just("A and B finished, I don't know their value"));

If you care about A and B values

Use zip + map/flatMap depending on what you want to do with the result.

Mono.zip(monoAwithSubscribeOn, monoBwithSubscribeOn)
    .map(tuple2 -> new Foo(tuple2.getT1(), tuple2.getT2(), "bar");

or

Mono.zip(monoAwithSubscribeOn, monoBwithSubscribeOn)
    .flatMap(tuple2 -> fetchMoreDataAsMono(tuple2.getT1(), tuple2.getT2()));

then will ignore the previous data, so it wouldn't make much sense to use zip before it.

also, zip will result in an empty Mono if one of A or B is empty! Use switchIfEmpty/defaultIfEmpty to protect against that case.

Suwannee answered 10/1, 2018 at 8:47 Comment(3)
++ for the subscribeOn(Scheduler) hint!Story
For parallel execution myMono.subscribeOn(Schedulers.boundedElastic());Padgett
Why would you need to switch the scheduler? If both Monos do not block the thread they are started within ns/ms of each other and run in parallel afaik.Tabriz
J
1

Also if your publishers return the same generic type, then you can use Flux.merge() which subscribes to provided publishers eagerly.

From docs:

Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence. Unlike concat, sources are subscribed to eagerly.

If you don't care about results of publishers provided, you can use then() to continue with another Mono when the previous are completed:

Flux.merge(mono1, mono2)
        .then(anotherMono())
Joella answered 20/2, 2023 at 11:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.