What is the proper way to wait till all Mono responses are returned from downstream APIs
Asked Answered
P

1

6

I'm quite new to Mono and Flux. I'm trying to join several downstream API responses. It's a traditional blocking application. I don't wish to collect a list of Mono, I want a List of the payloads returned from the downstream APIs, which I fetch from the Mono. However the 'result' being returned to the controller at times only has some or none of the downstream API responses. What is the correct way to do this? I've read several posts How to iterate Flux and mix with Mono states

you should not call subscribe anywhere in a web application. If this is bound to an HTTP request, you're basically triggering the reactive pipeline with no guarantee about resources or completion. Calling subscribe triggers the pipeline but does not wait until it's complete

Should I be using CompletableFuture?

In my Service I attempted

var result = new ArrayList<List<>>();
List<Mono<X>> monoList = apiCall();
Flux.fromIterable(monoList)
            .flatMap(m -> m.doOnSuccess(
                        x -> {  
                            result.add(x.getData());
                        }
        )).subscribe();

I also attempted the following in controller, but the method returns without waiting for subscribe to complete

var result = new ArrayList<List<X>>();
        Flux.concat(
                this.service.callApis(result, ...)
        ).subscribe();
        return result;

In my service

public Mono<Void> callApis(List<List<x>> result, ..) {
...
return Flux.fromIterable(monoList)
                .flatMap(m -> m.doOnSuccess(
                        x -> {  
                            result.add(x.getData()...);
                        }
                )).then();
Parsimony answered 4/6, 2022 at 23:11 Comment(2)
It really depends on the bigger context of your code. Is it a webflux app? Is it a traditional blocking application? Anyway, instead of filling an external list, it's better to use collectList operator, if you really need all the results together.Faircloth
No it's a traditional blocking application. In terms of the collectList I don't wish to collect a list of Mono, I want a List of the payloads returned from the downstream APIs, which I fetch from the Mono.Parsimony
T
6

The Project Reactor documentation (which is very good) has a section called Which operator do I need?. You need to create a Flux from your API calls, combine the results, and then return to the synchronous world.

In your case, it looks like all your downstream services have the same API, so they all return the same type and it doesn't really matter what order those responses appear in your application. Also, I'm assuming that apiCall() returns a List<Mono<Response>>. You probably want something like

Flux.fromIterable(apiCall()) // Flux<Mono<Response>>
    .flatMap(mono -> mono) // Flux<Response>
    .map(response -> response.getData()) // Flux<List<X>>
    .collectList() // Mono<List<List<X>>>
    .block(); // List<List<X>>

The fromIterable(...).flatMap(x->x) construct just converts your List<Mono<R>> into a Flux<R>.

map() is used to extract the data part of your response.

collectList() creates a Mono that waits until the Flux completes, and gives a single result containing all the data lists.

block() subscribes to the Mono returned by the previous operator, and blocks until it is complete, which will (in this case) be when all the Monos returned by apiCall() have completed.

There are many possible alternatives here, and which is most suitable will depend on your exact use case.

Trilbie answered 5/6, 2022 at 13:11 Comment(1)
Thank you for your detailed response I ended up using .toFuture(); instead of block() was trying to avoid the use of block(). Thanks againParsimony

© 2022 - 2024 — McMap. All rights reserved.