What is a Grouped Flux and how exactly do we work with one?
Asked Answered
H

1

7

I am working on a Flux of some object lets say Flux < MovieReservation > . This contains info like movie id,name, timing, title etc. So I wanted to extract the info that can help in creating a new
Flux < MovieShowDetail > . My intent was to group all reservations by the movie id and break the Flux down into a set of smaller and multiple Flux(fluxes if that's thing at all). Something like

Flux {
   movie1 -> Flux<MovieShowDetail>
   movie2 -> Flux<MovieShowDetail>
   ... and so on
}

So I came across this groupBy method which is supposed to do something like this only. However the documentation is really out of content on this, especially on how to iterate over each movie and its respective Flux.

Moreover when I try to learn by try and error, the processing stops after working on the operation before the groupBy method.

I have tried to do

fluxOfSomething
.groupBy( movieReservation -> movieReservation.getMovieId ,
movieReservation -> movieReservation) 

so that i can iterate over each of the flux and create the new flux of MovieShowDetail. However, processing never gets in this block. I tried logging stuff but the flow never entered it.

flux
    .map( movieSomething -> do something)
    .groupBy( movieReservation -> 
        movieReservation.getMovieId , movieReservation ->
    movieReservation)
    .subscribe("This text doesn't get printed");

I really need as much info as you can give about this.

Hassan answered 7/6, 2019 at 13:3 Comment(0)
C
13

groupBy produces a Flux<Flux<T>> (or more precisely a Flux<GroupedFlux<T>>, which exposes the key of each group).

A GroupedFlux, like a Flux, must be subscribed to to become active. So what you need to do is somehow consume the inner Flux that groupBy produces.

One typical way of doing that is by using flatMap, which already takes a Function<T, Flux> transformation. The function can be as simple as Function.identity() (but if you want to further process each element in the inner Flux you should probably do so from within the flatMap Function (because the group key is in scope of that lambda).

movieReservations
    .groupBy(MovieReservation::movieId)
    .flatMap(idFlux -> idFlux
        .collectList()
        .map(listOfReservations ->
            new MovieInformation(idFlux.key(), listOfReservations)
        )
    );
Chemosmosis answered 10/6, 2019 at 13:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.