How to convert List<T> to Flux<T> by using Reactor 3.x
Asked Answered
C

2

9

I have a Asyn call thrift interface:

public CompletableFuture<List<Long>> getFavourites(Long userId){
    CompletableFuture<List<Long>> future = new CompletableFuture();
    OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
    callback.addObserver(new OctoObserver() {
        @Override
        public void onSuccess(Object o) {
            future.complete((List<Long>) o);
        }

        @Override
        public void onFailure(Throwable throwable) {
            future.completeExceptionally(throwable);
        }
    });
    try {
        recommendAsyncService.getFavorites(userId, callback);
    } catch (TException e) {
        log.error("OctoCall RecommendAsyncService.getFavorites", e);
    }
    return future;
}

Now it returns a CompletableFuture<List>. And then I call it to do some processor by using Flux.

public Flux<Product> getRecommend(Long userId) throws InterruptedException, ExecutionException, TimeoutException {
    // do not like it
    List<Long> recommendList = wrapper.getRecommend(userId).get(2, TimeUnit.SECONDS);

    System.out.println(recommendList);
    return Flux.fromIterable(recommendList)
            .flatMap(id -> Mono.defer(() -> Mono.just(Product.builder()
                    .userId(userId)
                    .productId(id)
                    .productType((int) (Math.random()*100))
                    .build())))
            .take(5)
            .publishOn(mdpScheduler);
}

However, I want to get a Flux from getFavourites method and I can use it in getRecommend method.
Or, you can recommend a Flux API ,and I can convert the List<Long> recommendList to Flux<Long> recommendFlux.

Coracoid answered 24/8, 2020 at 7:53 Comment(0)
D
4

To convert a CompletableFuture<List<T>> into a Flux<T> you can use Mono#fromFuture with Mono#flatMapMany:

var future = new CompletableFuture<List<Long>>();
future.completeAsync(() -> List.of(1L, 2L, 3L, 4L, 5L),
    CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));

Flux<Long> flux = Mono.fromFuture(future).flatMapMany(Flux::fromIterable);

flux.subscribe(System.out::println);

List<T> received asynchronously in a callback can be also converted into a Flux<T> without using a CompletableFuture. You can directly use Mono#create with Mono#flatMapMany:

Flux<Long> flux = Mono.<List<Long>>create(sink -> {
  Callback<List<Long>> callback = new Callback<List<Long>>() {
    @Override
    public void onResult(List<Long> list) {
      sink.success(list);
    }

    @Override
    public void onError(Exception e) {
      sink.error(e);
    }
  };
  client.call("query", callback);
}).flatMapMany(Flux::fromIterable);

flux.subscribe(System.out::println);

Or simply using Flux#create with multiple emissions in one pass:

Flux<Long> flux = Flux.create(sink -> {
  Callback<List<Long>> callback = new Callback<List<Long>>() {
    @Override
    public void onResult(List<Long> list) {
      list.forEach(sink::next);
    }

    @Override
    public void onError(Exception e) {
      sink.error(e);
    }
  };
  client.call("query", callback);
});

flux.subscribe(System.out::println);
Dagall answered 24/8, 2020 at 9:26 Comment(2)
Int the part of directly using Mono#create with Mono#flatMapMany. I cannot realize the callback . Whether do i use wrong tool? Or you just supply a pseudocode?Coracoid
@dyy.alex, Callback is an interface of class you have to create yourself. Or use the one supplied by the client library (a lib behind the client variable).Dagall
G
9

Simple solution is to use Flux.fromIterable as shown in below example

public Flux<Integer> fromListToFlux(){
    List<Integer> intList = Arrays.asList(1,2,5,7);
    return Flux.fromIterable(intList);
}

Springboot version is 3.1.0

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.0</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

Note: This is not recommended because it will not be completely reactive when you work out of the reactive pipeline, here creating a List.

Guelph answered 1/6, 2023 at 17:4 Comment(0)
D
4

To convert a CompletableFuture<List<T>> into a Flux<T> you can use Mono#fromFuture with Mono#flatMapMany:

var future = new CompletableFuture<List<Long>>();
future.completeAsync(() -> List.of(1L, 2L, 3L, 4L, 5L),
    CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));

Flux<Long> flux = Mono.fromFuture(future).flatMapMany(Flux::fromIterable);

flux.subscribe(System.out::println);

List<T> received asynchronously in a callback can be also converted into a Flux<T> without using a CompletableFuture. You can directly use Mono#create with Mono#flatMapMany:

Flux<Long> flux = Mono.<List<Long>>create(sink -> {
  Callback<List<Long>> callback = new Callback<List<Long>>() {
    @Override
    public void onResult(List<Long> list) {
      sink.success(list);
    }

    @Override
    public void onError(Exception e) {
      sink.error(e);
    }
  };
  client.call("query", callback);
}).flatMapMany(Flux::fromIterable);

flux.subscribe(System.out::println);

Or simply using Flux#create with multiple emissions in one pass:

Flux<Long> flux = Flux.create(sink -> {
  Callback<List<Long>> callback = new Callback<List<Long>>() {
    @Override
    public void onResult(List<Long> list) {
      list.forEach(sink::next);
    }

    @Override
    public void onError(Exception e) {
      sink.error(e);
    }
  };
  client.call("query", callback);
});

flux.subscribe(System.out::println);
Dagall answered 24/8, 2020 at 9:26 Comment(2)
Int the part of directly using Mono#create with Mono#flatMapMany. I cannot realize the callback . Whether do i use wrong tool? Or you just supply a pseudocode?Coracoid
@dyy.alex, Callback is an interface of class you have to create yourself. Or use the one supplied by the client library (a lib behind the client variable).Dagall

© 2022 - 2025 — McMap. All rights reserved.