How to iterate over Mono<List<String>> calling Mono returning method
Asked Answered
E

1

6

I'm attempting to achieve the following in the method below

  1. Get All Cars from Dealer X
  2. Create wrapper object that stores a set of all cars and another set of all manufactures 2a. Populate Cars set with the cars obtained in Step 1
  3. For each Car get all of their independent manufactures
  4. Store all obtained manufactures into the wrapper objects manufactures Set
  5. Return Mono of Car and manufactures
Mono<CarAndManufactures> requestCarAndManufactures(Request req) {
final String dealerId = buildDealerId(req.getDealerRegion(), req.getDealerId());
final CarAndManufactures CarAndManufactures = new CarAndManufactures();
return webSocketClient.getCars(dealerId) //note #getCars returns a Mono
    .map(getCarsResponse -> getCarsResponse
      .getResult()
      .stream()
      .map(Car::getId)
      .collect(toSet()))
    .map(carIds -> {
      CarAndManufactures.setCars(carIds);
      return CarAndManufactures;
    })
    .flatMapMany(CarAndManufactures1 -> Flux.fromIterable(CarAndManufactures.getCars().keySet()))
    .collectList()
    .log("Existing cars")
    .flatMap(carIds -> { //This is the problem area
      carIds
          .stream()
          .map(carId -> {
            webSocketClient.getManufactures(carId) //Note getManufactures returns a Mono... This method does look like its ever called
                .map(getManufactureResponse -> getManufactureResponse
                    .getResult()
                    .stream()
                    .map(Manufacture::getId)
                    .collect(toSet()))
                .map(ManufactureIds -> {
                  CarAndManufactures.SetManufactures(ManufactureIds); //since the line commented on above is not called the Manufacture Set is empty
                  return CarAndManufactures;
                });
            return CarAndManufactures;
          });
          return just(CarAndManufactures);
        }
    )
    .log("Car And Manufactures"); 

}

The Set of Manufactures is alway empty doesnt look like webSocketClient.getManufactures(carId) is ever called. Thought I might be missing a .subscribe some where but since this is being used by a webflux controller I think no #subscribes are needed anywhere

Edgewise answered 22/2, 2019 at 22:57 Comment(1)
Can webSocketClient.getCars(dealerId) return an empty Mono? If yes, webSocketClient#getManufactures is never called since the param carIds in the flatMap can be an empty list (notice that Flux#collectList emits an empty list if the sequence is empty).Oarsman
L
0

I know I'm too late, but maybe someone will find this question and see the answer, I hope that would be helpful.

The reason why webSocketClient.getManufactures(carId) is never called in your code is obvious: nobody subscribes to this publisher since you put this call to simple .map() operator of Stream API.

The entire reactive chain should be holistic, without any "breaks". After getting ids of cars and putting them to .flatMap(), inside that .flatMap() you should have declared another Flux from those ids, and on that Flux you put another .flatMap() to request manufactures and finally collect them into resulting Set.

Also you should not have declared final CarAndManufactures CarAndManufactures = new CarAndManufactures(); implicilty and then tried to fill this object within the reactive chain. Doing that you're mixing imperative style code with reactive style code which is incorrect way to use reactive.

You should stay in your reactive chain all the way.

So, the way to achieve what yout what is the following: (I also made the code "better" and cleaner):

@Test
public void carsAndManufacturesTest() {
    String dealerId = "test dealer";

    client.getCars(dealerId)                                  // request Mono<CarsResponse>
            .map(CarsResponse::getResult)                     // getting List<Cars> from response
            .map(cars ->                                      // getting ids of cars and collect them to Set<String>
                    cars.stream()
                            .map(Car::getId)
                            .collect(Collectors.toSet())
            )
            .flatMap(carsIds ->                               // creating another publisher that will fetch manufactures and build the CarAndManufactures
                    this.fetchManufacturesIds(carsIds)        // fetching Set<String> manufactures for all the carsIds
                            .map(manufacturesIds ->           // we're done here! we have both carsIds and manufactureIds!
                                    new CarAndManufactures(   // creating desired object from both carsIds and their manufacturesIds
                                            carsIds,
                                            manufacturesIds)
                            )
            )
            .doOnNext(carAndManufactures -> log.info(carAndManufactures.toString()))
            .subscribe();
}

/**
 * Fetches all the manufactures ids of given cars ids
 *
 * @param carsIds ids of cars
 * @return Mono with all the manufactures ids of given cars ids
 */
public Mono<Set<String>> fetchManufacturesIds(Set<String> carsIds) {

    return Flux.fromIterable(carsIds)                       // creating flux of given cars ids
            .flatMap(client::getManufactures)               // request Mono<ManufactureResponse> for car id
            .map(ManufactureResponse::getResult)            // getting List<Manufacture>
            .map(manufactures ->                            // getting ids of manufactures and collect them to Set
                    manufactures.stream()
                            .map(Manufacture::getId)
                            .collect(Collectors.toSet())
            )
            .collectList()                                  // collecting all the sets of manufactures ids, here we get List<Set<String>>
            .map(list ->                                    // flatting all the sets to one set, eventually we get here Set<String>
                    list.stream()
                            .flatMap(Collection::stream)
                            .collect(Collectors.toSet())
            );
}

For those who want to test this code I'll leave here the specs of classes. Then in the code above just create the client private Client client = new Client(); (as a field of class where you put those two merhods)

Classes:

/*
    ================== Classes examples for those who wants to test this code ===================
 */
@Slf4j
class Client {

    public Mono<CarsResponse> getCars(String dealerId) {
        log.info("Received request to fetch cars by dealer id: {}", dealerId);

        List<Car> cars =
                List.of(new Car("MUSTANG"),
                        new Car("FOCUS"),
                        new Car("FUSION")
                );
        return Mono.just(new CarsResponse(cars));
    }

    public Mono<ManufactureResponse> getManufactures(String carId) {
        log.info("Received request to fetch manufactures by car id: {}", carId);

        List<Manufacture> manufactures =
                List.of(new Manufacture("MF BUFFALO"),
                        new Manufacture("MF CHANGAN"),
                        new Manufacture("MF CHICAGO"),
                        new Manufacture("MF DEARBORN")
                );
        return Mono.just(new ManufactureResponse(manufactures));
    }
}

/*
    ================== Data classes ===================
 */
@Data
@AllArgsConstructor
class CarsResponse {

    private List<Car> result;
}

@Data
@AllArgsConstructor
class ManufactureResponse {

    List<Manufacture> result;
}

@Data
@AllArgsConstructor
class CarAndManufactures {

    private Set<String> cars;
    private Set<String> manufactures;
}

@Data
@AllArgsConstructor
class Car {

    private String id;
}

@Data
@AllArgsConstructor
class Manufacture {

    private String id;
}
Leucopoiesis answered 10/1, 2023 at 20:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.