Can you Flux.zip a mono and a flux and and repeat the mono value for every flux value?
Asked Answered
A

2

4

Is it possible to do something like the code below? I have one service that makes an API call and another one which returns a stream of values. I need to modify every value by the value returned by the API call.

return Flux.zip(
                     someMono.get(),
                     someFlux.Get(),
                     (d, t) -> {
                         //HERE D IS ALWAYS THE SAME AND T IS EVERY NEW FLUX VALUE
                     });

I've tried with .repeat() for the Mono and it works, but it's calling the method every time there's a new Flux value and it's a API call, so it's not good.

Is it possible?

Arsenault answered 14/1, 2020 at 22:56 Comment(1)
Simply add a .cache() before your .repeat() to have the underlying HTTP call only fired once. See the answer from @rahul-kushwaha for an example.Judejudea
S
11

You can accomplish this using cache operator. Uncomment the Flux without cache and you will see the number of calls made to getNum == 100. With cache it will be 1.

public class RepeatableMono {

  private static AtomicInteger numberOfCalls = new AtomicInteger(0);

  static Integer getNum() {
    System.out.println("GetNum Called: " + numberOfCalls.incrementAndGet());
    return 0;
  }

  public static void main(String[] args) {
    // This will print `GetNum Called: ` 100 times.
    //Flux<Integer> neverEndingFlux = Mono.defer(() -> Mono.just(getNum()))
    // .repeat();

    // This will print `GetNum Called: ` 1 times.
    Flux<Integer> neverEndingFlux = Mono.defer(() -> Mono.just(getNum()))
        .cache()
        .repeat();

    Flux<Integer> stream = Flux.range(1, 100);

    Flux.zip(neverEndingFlux, stream, (x, y) -> x + " " + y)
        .subscribe(System.out::println);
  }
}

Shrapnel answered 10/5, 2020 at 7:28 Comment(1)
I'm not sure why this was downvoted. This actually answers the question in the OP, unlike the upvoted answer.Judejudea
P
6

This will illustrate how to combine a flux with a mono such that every time the flux emits, the mono is emitted as well.

Suppose you have a flux and a mono like this:

 // a flux that contains 6 elements.
 final Flux<Integer> userIds = Flux.fromIterable(List.of(1,2,3,4,5,6));

 // a mono of 1 element.
 final Mono<String> groupLabel = Mono.just("someGroupLabel");

First, I'll show you the wrong way of trying to zip the 2 which I tried, and I think other people would try:

 // wrong way - this will only emit 1 event 
 final Flux<Tuple2<Integer, String>> wrongWayOfZippingFluxToMono = userIds
         .zipWith(groupLabel);

 // you'll see that onNext() is only called once, 
 //     emitting 1 item from the mono and first item from the flux.
 wrongWayOfZippingFluxToMono
         .log()
         .subscribe();

wrong way of zipping up the flux and mono

 // this is how to zip up the flux and mono how you'd want, 
 //     such that every time the flux emits, the mono emits. 
 final Flux<Tuple2<Integer, String>> correctWayOfZippingFluxToMono = userIds
         .flatMap(userId -> Mono.just(userId)
                 .zipWith(groupLabel));

 // you'll see that onNext() is called 6 times here, as desired. 
 correctWayOfZippingFluxToMono
         .log()
         .subscribe();

correct way of zipping up the flux and mono

Parturition answered 16/5, 2020 at 5:19 Comment(1)
This does not actually answer the question in the OP. This causes the original Mono to emit every time the Flux emits (and thus, fire an HTTP call). The desire is to have the HTTP call behind the Mono only fire once, and have the value zipped in.Judejudea

© 2022 - 2024 — McMap. All rights reserved.