1. Make sure you want this
A fair warning before diving into anything else: Converting a Flux
to a List
/Stream
makes the whole thing not reactive in the strict sense of the concept because you are leaving the push domain and trading it with a pull domain. You may or may not want this (usually you don't) depending on the use-case. Just wanted to leave the note.
2. Converting a Flux to a List
According to the Flux documentation, the collectList
method will return a Mono<List<T>>
. It will return immediately, but it's not the resulting list itself, but a lazy structure, the Mono
, that promises the result will eventually be there when the sequence is completed.
According to the Mono documentation, the block
method will return the contents of the Mono when it completes. Keep in mind that block
may return null.
Combining both, you could use someFlux.collectList().block()
. Provided that someFlux
is a Flux<Object>
, the result would be a List<Object>
.
The block
method won't return anything if the Flux is infinite. As an example, the following will return a list with two words:
Flux.fromArray(new String[]{"foo", "bar"}).collectList().block()
But the following will never return:
Flux.interval(Duration.ofMillis(1000)).collectList().block()
To prevent blocking indefinitely or for too long, you may pass a Duration
argument to block
, but that will timeout with an exception when the subscription does not complete on time.
3. Converting a Flux to a Stream
According to the Flux documentation, the toStream
method converts a Flux<T>
into a Stream<T>
. This is more friendly to operators such as flatMap
. Mind this simple example, for the sake of demonstration:
Stream.of("f")
.flatMap(letter ->
Flux.fromArray(new String[]{"foo", "bar"})
.filter(word -> word.startsWith(letter)).toStream())
.collect(Collectors.toList())
One could simply use .collectList().block().stream()
, but not only it's less readable, but it could also result in NPE if block
returned null. This approach does not finish for an infinite Flux as well, but because this is a stream of unknown size, you can still use some operations on it before it's complete, without blocking.