how to convert a Flux<Object> list into a List<Object>
Asked Answered
D

2

7

I have a Flux and I want to convert it to List. How can I do that?

Flux<Object> getInstances(String serviceId); // Current one 
List<Object> getInstances(String serviceId); // Demanded one

Java 8 or reactive components have a prepared method to map or convert it to List ??

I should use .map()

final List<ServiceInstance> sis = convertedStringList.parallelStream()
            .map( this.reactiveDiscoveryClient::getInstances )
            // It should be converted to List<Object>
Dearing answered 10/6, 2020 at 12:27 Comment(0)
E
10

1. Make sure you want this

A fair warning before diving into anything else: Converting a Flux to a List/Stream makes the whole thing not reactive in the strict sense of the concept because you are leaving the push domain and trading it with a pull domain. You may or may not want this (usually you don't) depending on the use-case. Just wanted to leave the note.

2. Converting a Flux to a List

According to the Flux documentation, the collectList method will return a Mono<List<T>>. It will return immediately, but it's not the resulting list itself, but a lazy structure, the Mono, that promises the result will eventually be there when the sequence is completed.

According to the Mono documentation, the block method will return the contents of the Mono when it completes. Keep in mind that block may return null.

Combining both, you could use someFlux.collectList().block(). Provided that someFlux is a Flux<Object>, the result would be a List<Object>.

The block method won't return anything if the Flux is infinite. As an example, the following will return a list with two words:

Flux.fromArray(new String[]{"foo", "bar"}).collectList().block()

But the following will never return:

Flux.interval(Duration.ofMillis(1000)).collectList().block()

To prevent blocking indefinitely or for too long, you may pass a Duration argument to block, but that will timeout with an exception when the subscription does not complete on time.

3. Converting a Flux to a Stream

According to the Flux documentation, the toStream method converts a Flux<T> into a Stream<T>. This is more friendly to operators such as flatMap. Mind this simple example, for the sake of demonstration:

Stream.of("f")
    .flatMap(letter ->
                Flux.fromArray(new String[]{"foo", "bar"})
                        .filter(word -> word.startsWith(letter)).toStream())
    .collect(Collectors.toList())

One could simply use .collectList().block().stream(), but not only it's less readable, but it could also result in NPE if block returned null. This approach does not finish for an infinite Flux as well, but because this is a stream of unknown size, you can still use some operations on it before it's complete, without blocking.

Emotionalize answered 11/6, 2020 at 20:15 Comment(0)
R
0

If you want to convert Project Reactor Flux into java.util.List without blocking the thread and terminate async process, you can do it this way

Flux.fromIterable("Str1","Str2","Str3") //Flux<String>
    .collectList() //Mono<List<String>>
    .flatMap(listStrs -> {Do whatever you want with the list})

The key function is collectionList() that gather all result into single Mono<List<String>>

Note: collectionList() does not block the thread BUT introduces a form of "wait" to collect all the elements from the previous operation and return them into a list

Java Doc collectionListDesc

Rupert answered 13/11, 2023 at 15:16 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.