Flux to List<Objects> without blocking
Asked Answered
A

3

7

Looking for converting Flux to List<Object>. Getting error if I use block(). So, need to conver without blocking calls.

Flux.from(Collection.find())

Using reactive programming, but graphql expects List<objects> and erroring with returning Flux.

Code with Block()

public List<Test> findAll() {
        return Flux.from(testCollection.find()).collectList().block();

}

Error :-

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-kqueue-7

Here, I need to return List<Test> as I can not send Flux<Test> for some reason.

Adrastus answered 31/5, 2020 at 1:40 Comment(3)
You can't. This needs more detail - there is categorically no way to convert a Flux to a Collection in a non-blocking fashion. There's ways around the issue - you could switch the flux to use a separate thread where blocking is allowed, you could call your graphql library in a subscriber chain, etc. - but it's difficult / impossible to tell which approach is best with a minimal reproducible example.Treatise
If the context is with respect to Spring Webflux, you really don't need because framework subscribes to the flux/mono implicitly.Amathist
Posting tips: (a) please spell-check your work; (b) please always use a capital letter for the personal pronoun "I"; (c) please refrain from adding conversational material such as greetings and thanks; (d) always use the preview pane, so you can see if your material has been rendered correctly. The generics expressions needed repair here.Albemarle
L
3

As stated in the comments, You can't. The reactive pattern is to stay in a flow.

So,

Mono<GraphqlResponse> = Flux.just("A", "B" "C")
  .collectList()
  .map(this::someMethod);

GraphqlResponse someMethod(List<String> abcs) {
    return graphQl.doSomething(abcs);
}
Lockjaw answered 1/6, 2020 at 0:52 Comment(0)
P
1

following example is converting flux<Object> to List<Object> but remember that converting a Flux to a List makes the whole thing NOT reactive:

public static void main(String[] args) {
    Flux<String> flux = Flux.just("test1", "test2", "test3");
    List<String> list = new ArrayList<>();
    flux.collectList().subscribe(list::addAll);
    list.forEach(System.out::println);
}
Pu answered 26/10, 2021 at 12:51 Comment(0)
E
0

I assume the Collection class is from some reactor library doing some tcp/http call? collection.find returns a Flux/Mono/Publisher I assume? Then thats not because of collectList doesn't allow you to do that, thats because you are trying to run block on a thread which is a NonBlocking thread, I assume collection.find publish the elements on a thread which is an instance of NonBlocking, from its name reactor-http-kqueue-7 which is probably a netty thread.

You can take a look at BlockingSingleSubscriber.blockingGet method which tells you why

    final T blockingGet() {
        if (Schedulers.isInNonBlockingThread()) {
            throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName());
        }
...

If you have to get the result from the caller thread (which calls Flux.from), then you can do

Flux.from(testCondition.find())
.publishOn(Schedulers.boundedElastic())
.collectList()
.block()
Eastern answered 3/6, 2020 at 15:55 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.