Proper way to create a Flux from a list of Mono's
Asked Answered
G

1

4

Lets say I have a API operation that consumes a List of CustomObjects. For every one of those objects it calls a service method that creates a Mono. How do I create a Flux from those Mono objects in an idiomatic and therefore non-blocking way?

What I've come up with for now is this. I changed the method names to better reflect their intended purpose.

fun myApiMethod(@RequestBody customObjs: List<CustomObject>): Flux<CustomObject> {

    return Flux.create { sink ->
        customObjs.forEach {

            service.persistAndReturnMonoOfCustomObject(it).map {
                sink.next(it)
            }
        }
        sink.complete()
    }
}

Moreover do I need to subscribe to the flux to actually make it return something?

Granthem answered 14/8, 2018 at 14:25 Comment(2)
The hint @artem-bilan gave did the trick.Granthem
Isn't this code totally wrong ? presumably persistAndReturnMonoOfCustomObject does not block, and so all the foreach block is executed instantaneously and then sink.complete() is called. So all the subsequent sink.next(it) (executed only when the mono returned by persistAndReturnMonoOfCustomObject completes) have no effect ?Inept
P
10

I believe you can use concat() instead:

/**
 * Concatenate all sources provided as a vararg, forwarding elements emitted by the
 * sources downstream.
 * <p>
 * Concatenation is achieved by sequentially subscribing to the first source then
 * waiting for it to complete before subscribing to the next, and so on until the
 * last source completes. Any error interrupts the sequence immediately and is
 * forwarded downstream.
 * <p>
 * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/concat.png" alt="">
 * <p>
 * @param sources The {@link Publisher} of {@link Publisher} to concat
 * @param <T> The type of values in both source and output sequences
 *
 * @return a new {@link Flux} concatenating all source sequences
 */
@SafeVarargs
public static <T> Flux<T> concat(Publisher<? extends T>... sources) {

Or merge():

/**
 * Merge data from {@link Publisher} sequences contained in an array / vararg
 * into an interleaved merged sequence. Unlike {@link #concat(Publisher) concat},
 * sources are subscribed to eagerly.
 * <p>
 * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/merge.png" alt="">
 * <p>
 * Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
 * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
 * in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
 * another source.
 *
 * @param sources the array of {@link Publisher} sources to merge
 * @param <I> The source type of the data sequence
 *
 * @return a merged {@link Flux}
 */
@SafeVarargs
public static <I> Flux<I> merge(Publisher<? extends I>... sources) {
Pimiento answered 14/8, 2018 at 15:55 Comment(2)
merge() or concat() seems each mono is executed in a single thread, not parallelly by multiple threads.Rigid
This is not related to this question. Please, raise a separate SO thread with more info. I don't think that parallelism request should be a responsibility of these operators. More over see that concat() JavaDocs: the next source is subscribed only when a previous is finished. See a publishOn() operator for that concurrency execution feature.Pimiento

© 2022 - 2024 — McMap. All rights reserved.