Group by object property in java flux
Asked Answered
C

2

12

Given the following data structure Data and Flux<Data> what is idiomatic way to achieve grouping into series of lists based on some property:

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;

class Scratch {
    private static class Data {
        private Integer key;
        private String value;

        public Data(Integer key, String value) {
            this.key = key;
            this.value = value;
        }

        public Integer getKey() {
            return key;
        }

        public String getValue() {
            return value;
        }

        public static Data of(Integer key, String value) {
            return new Data(key, value);
        }

        @Override
        public String toString() {
            return value;
        }
    }

    public static void main(String[] args) {
        Flux<Data> test = Flux.just(
                Data.of(1, "Hello"),
                Data.of(1, "world"),
                Data.of(2, "How"),
                Data.of(2, "are"),
                Data.of(2, "you"),
                Data.of(3, "Bye"));
        test.bufferUntil(new Predicate<Data>() {
            Integer prev = null;
            @Override
            public boolean test(Data next) {
                boolean collect = prev != null && !Objects.equals(prev, next.getKey());
                prev = next.getKey();
                return collect;
            }
        }, true).subscribe(e -> System.out.println(e.toString()));
    }
} 

Output:

[Hello, world]
[How, are, you]
[Bye]

I am aware of groupBy function on Flux, but this gives me again a Flux, not a list. Current solution I have described above works, but it does not feel 100% idiomatic because I had to use anonymous class instead of lambda. I could have use lambda and AtomicReference outside from lambda, but that too does not feel 100% right. Any suggestions?

Constitutional answered 20/9, 2018 at 20:26 Comment(2)
You can just replace the anonymous inner class with lamda function, can't you? And you could use the groupby operator and then iterate over the flux of grouped fluxesBarbour
How would you store prev in lambda? I have no idea how to use groupby operator (in my context) :(Constitutional
R
6

You can also use collectMultimap which allows you to have Map<K, Collection<T>. In this case collectMultimap will return: Mono<Map<Integer,Collection<Data>>>:

 test.collectMultimap( Data::getKey )
     .subscribe( dataByKey -> System.out.println( dataByKey.toString() ) );

Output:

{1=[Hello, world], 2=[How, are, you], 3=[Bye]}
Ranch answered 2/12, 2020 at 20:40 Comment(2)
That does not work in our case, due to amount of data we would need to hold in memory.Constitutional
I am just blind. First link after description of my problem. Saved 2 days. Working like a charm.Diffract
B
5

Here is a solution using groupBy operator. I have grouped the data by the common key. The groupBy operator gives me a Flux of GroupedFlux. GroupedFlux is a subclass of Flux, so I apply flatMap and convert an individual groupedFlux to a List<Data> using the collectList operator. Like this, I get a Flux<List<Data>>, which I then subscribe to and print, as asked by you.

test.groupBy(Data::getKey)
                .flatMap(Flux::collectList)
                .subscribe(listOfStringsHavingDataWithSameKey -> System.out.println(listOfStringsHavingDataWithSameKey.toString()));

Do checkout the documentations for Flux and GroupedFlux.

Barbour answered 2/11, 2018 at 17:58 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.