Java Lambda Stream Distinct() on arbitrary key? [duplicate]
Asked Answered
T

9

79

I frequently ran into a problem with Java lambda expressions where when I wanted to distinct() a stream on an arbitrary property or method of an object, but wanted to keep the object rather than map it to that property or method. I started to create containers as discussed here but I started to do it enough to where it became annoying and made a lot of boilerplate classes.

I threw together this Pairing class, which holds two objects of two types and allows you to specify keying off the left, right, or both objects. My question is... is there really no built-in lambda stream function to distinct() on a key supplier of some sorts? That would really surprise me. If not, will this class fulfill that function reliably?

Here is how it would be called

BigDecimal totalShare = orders.stream().map(c -> Pairing.keyLeft(c.getCompany().getId(), c.getShare())).distinct().map(Pairing::getRightItem).reduce(BigDecimal.ZERO, (x,y) -> x.add(y));

Here is the Pairing class

    public final class Pairing<X,Y>  {
           private final X item1;
           private final Y item2;
           private final KeySetup keySetup;

           private static enum KeySetup {LEFT,RIGHT,BOTH};

           private Pairing(X item1, Y item2, KeySetup keySetup) {
                  this.item1 = item1;
                  this.item2 = item2;
                  this.keySetup = keySetup;
           }
           public X getLeftItem() { 
                  return item1;
           }
           public Y getRightItem() { 
                  return item2;
           }

           public static <X,Y> Pairing<X,Y> keyLeft(X item1, Y item2) { 
                  return new Pairing<X,Y>(item1, item2, KeySetup.LEFT);
           }

           public static <X,Y> Pairing<X,Y> keyRight(X item1, Y item2) { 
                  return new Pairing<X,Y>(item1, item2, KeySetup.RIGHT);
           }
           public static <X,Y> Pairing<X,Y> keyBoth(X item1, Y item2) { 
                  return new Pairing<X,Y>(item1, item2, KeySetup.BOTH);
           }
           public static <X,Y> Pairing<X,Y> forItems(X item1, Y item2) { 
                  return keyBoth(item1, item2);
           }

           @Override
           public int hashCode() {
                  final int prime = 31;
                  int result = 1;
                  if (keySetup.equals(KeySetup.LEFT) || keySetup.equals(KeySetup.BOTH)) {
                  result = prime * result + ((item1 == null) ? 0 : item1.hashCode());
                  }
                  if (keySetup.equals(KeySetup.RIGHT) || keySetup.equals(KeySetup.BOTH)) {
                  result = prime * result + ((item2 == null) ? 0 : item2.hashCode());
                  }
                  return result;
           }

           @Override
           public boolean equals(Object obj) {
                  if (this == obj)
                         return true;
                  if (obj == null)
                         return false;
                  if (getClass() != obj.getClass())
                         return false;
                  Pairing<?,?> other = (Pairing<?,?>) obj;
                  if (keySetup.equals(KeySetup.LEFT) || keySetup.equals(KeySetup.BOTH)) {
                         if (item1 == null) {
                               if (other.item1 != null)
                                      return false;
                         } else if (!item1.equals(other.item1))
                               return false;
                  }
                  if (keySetup.equals(KeySetup.RIGHT) || keySetup.equals(KeySetup.BOTH)) {
                         if (item2 == null) {
                               if (other.item2 != null)
                                      return false;
                         } else if (!item2.equals(other.item2))
                               return false;
                  }
                  return true;
           }

    }

UPDATE:

Tested Stuart's function below and it seems to work great. The operation below distincts on the first letter of each string. The only part I'm trying to figure out is how the ConcurrentHashMap maintains only one instance for the entire stream

public class DistinctByKey {

    public static <T> Predicate<T> distinctByKey(Function<? super T,Object> keyExtractor) {
        Map<Object,Boolean> seen = new ConcurrentHashMap<>();
        return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
    }

    public static void main(String[] args) { 

        final ImmutableList<String> arpts = ImmutableList.of("ABQ","ALB","CHI","CUN","PHX","PUJ","BWI");

        arpts.stream().filter(distinctByKey(f -> f.substring(0,1))).forEach(s -> System.out.println(s));
    }

Output is...

ABQ
CHI
PHX
BWI
Thorne answered 9/1, 2015 at 22:16 Comment(0)
H
135

The distinct operation is a stateful pipeline operation; in this case it's a stateful filter. It's a bit inconvenient to create these yourself, as there's nothing built-in, but a small helper class should do the trick:

/**
 * Stateful filter. T is type of stream element, K is type of extracted key.
 */
static class DistinctByKey<T,K> {
    Map<K,Boolean> seen = new ConcurrentHashMap<>();
    Function<T,K> keyExtractor;
    public DistinctByKey(Function<T,K> ke) {
        this.keyExtractor = ke;
    }
    public boolean filter(T t) {
        return seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
    }
}

I don't know your domain classes, but I think that, with this helper class, you could do what you want like this:

BigDecimal totalShare = orders.stream()
    .filter(new DistinctByKey<Order,CompanyId>(o -> o.getCompany().getId())::filter)
    .map(Order::getShare)
    .reduce(BigDecimal.ZERO, BigDecimal::add);

Unfortunately the type inference couldn't get far enough inside the expression, so I had to specify explicitly the type arguments for the DistinctByKey class.

This involves more setup than the collectors approach described by Louis Wasserman, but this has the advantage that distinct items pass through immediately instead of being buffered up until the collection completes. Space should be the same, as (unavoidably) both approaches end up accumulating all distinct keys extracted from the stream elements.

UPDATE

It's possible to get rid of the K type parameter since it's not actually used for anything other than being stored in a map. So Object is sufficient.

/**
 * Stateful filter. T is type of stream element.
 */
static class DistinctByKey<T> {
    Map<Object,Boolean> seen = new ConcurrentHashMap<>();
    Function<T,Object> keyExtractor;
    public DistinctByKey(Function<T,Object> ke) {
        this.keyExtractor = ke;
    }
    public boolean filter(T t) {
        return seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
    }
}

BigDecimal totalShare = orders.stream()
    .filter(new DistinctByKey<Order>(o -> o.getCompany().getId())::filter)
    .map(Order::getShare)
    .reduce(BigDecimal.ZERO, BigDecimal::add);

This simplifies things a bit, but I still had to specify the type argument to the constructor. Trying to use diamond or a static factory method doesn't seem to improve things. I think the difficulty is that the compiler can't infer generic type parameters -- for a constructor or a static method call -- when either is in the instance expression of a method reference. Oh well.

(Another variation on this that would probably simplify it is to make DistinctByKey<T> implements Predicate<T> and rename the method to eval. This would remove the need to use a method reference and would probably improve type inference. However, it's unlikely to be as nice as the solution below.)

UPDATE 2

Can't stop thinking about this. Instead of a helper class, use a higher-order function. We can use captured locals to maintain state, so we don't even need a separate class! Bonus, things are simplified so type inference works!

public static <T> Predicate<T> distinctByKey(Function<? super T,Object> keyExtractor) {
    Map<Object,Boolean> seen = new ConcurrentHashMap<>();
    return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
}

BigDecimal totalShare = orders.stream()
    .filter(distinctByKey(o -> o.getCompany().getId()))
    .map(Order::getShare)
    .reduce(BigDecimal.ZERO, BigDecimal::add);
Hagiarchy answered 10/1, 2015 at 1:57 Comment(17)
Yeah thats what I was commenting on above. It does not require an intermediary collect() that only gets streamed again. I kind of like this DistinctByKey utility as it expresses intent a little more clearly. By the way, can you not use a generic static factory method to infer the type arguments?Thorne
A static factory would still need to infer T and K just like the constructor. It doesn't seem to work any better. T can probably be inferred because the filter must be T -> boolean and you have Stream<T>. But inferring K is probably causing the difficulty. Do do that, the compiler would have to take the inferred type of T, apply it to the lamda passed to the construct, which must be T -> K and then use that to infer K. I don't think the compiler does that or even if it's possible. It may be possible to get rid of the K type parameter though, hmmm....Hagiarchy
Hmm... perhaps if you use a different interface other than Function? I'll play with it. Although I hate to digress the topic, I'll post if i I have boilerplate-free solution.Thorne
@ThomasN. It still has to be a Function since it returns something. But the helper class doesn't need a type variable K; it can just use Object instead. See my update. I haven't been able to find a way to get the compiler to infer T, so my comment above is probably incorrect.Hagiarchy
@ThomasN. See update 2. It uses a completely different technique. Much simpler.Hagiarchy
Wow okay this looks pretty elegant. I'll need to view this off my phone and test it in Eclipse tomorrow morning. But if all works I think you win the grand prize.Thorne
What the helper method of the last solution does, can be done without a helper method as well. Though a helper method is reusable.Faustina
@Faustina The reusability and the name of the helper provide a lot of value.Hagiarchy
Okay, so my only question is this and maybe I'm not thinking at the level of functional abstraction you are. How is only one instance of the ConcurrentHashMap being persisted? It almost looks like a new instance of the ConcurrentHashMap is generated for each item in the stream?Thorne
Tested this and it looks like it works great. See update above.Thorne
Generalization for multiple keys: @SafeVarargs public static <T> Predicate<T> distinctByKey( Function<? super T, Object>... keyExtractor) { Map<List<Object>, Boolean> seen = new ConcurrentHashMap<>(); return t -> { List<Object> list = Arrays.stream(keyExtractor).map(func -> func.apply(t)).collect(toList()); return seen.putIfAbsent(list, Boolean.TRUE) == null; }; }Hammerfest
inference is broken by the ::filter part: making DistrictByKey<T> implements Predicate<T> should do the trickBeshrew
Thanks for your update. Do you understand why the map is generated only once now? Hope someone gives a explanation.Aptitude
Hongbin Shen would like to comment: why Map<Object,Boolean> seen = new ConcurrentHashMap<>(); is called only once, is because distinctByKey() is called only once and it returns a Predicate; what filter() does is just call predicate.test() for each item. So the map in distinctByKey is created only once.Toluene
@OleV.V. and map is available for returned predicate because it is copied (this is how java treats closures)?Plover
@ŁukaszChorąży (It wasn’t my comment originally, I just pasted it in as a comment.) No, only one DistinctByKey instance is created, or only one Predicate instance in the last example, so it is the same map, not a copy.Toluene
but I believe it is available for Predicate instance because java copies local variables, onto the heap, when creating instance of the anonymous classPlover
O
32

You more or less have to do something like

 elements.stream()
    .collect(Collectors.toMap(
        obj -> extractKey(obj), 
        obj -> obj, 
       (first, second) -> first
           // pick the first if multiple values have the same key
       )).values().stream();
Olid answered 9/1, 2015 at 22:21 Comment(6)
Interesting, so I guess it's fair to say there's no native lambda way to do this, but there are workarounds like you've shown.Thorne
This is a more clear approach than a wrapper class. I don't have to go and read another class to understand what this does. The only advantage I can think of to using a wrapper class is if you plan to .limit(...) the stream. Wrap-distinct-unwrap approach will only process as many elements as necessary while collecting to a map will materialize the entire stream.Widower
The other benefit I can think of with the wrapper approach is it does not require an intermediary collect operation which disrupts the stream and requires a temporary collection to be built that only gets streamed again. But I see the merits of the Collectors.toMap() approach. It gives a lot to think about and my coworker and I will probably have some interesting discussions about it tomorrow.Thorne
distinct() works pretty much the same way under the hood, so I wouldn't worry.Olid
Yeah at this point I'm pretty sure it is all semantics and personal preference, and all the approaches here are valid. It's still enlightening though!Thorne
The distinct operation passes through each element as soon as it's known to be distinct. This might be significant if the source is an infinite stream. If the source is finite, collect(...).stream() is functionally similar. It might have a performance disadvantage since downstream operations aren't run until the collector completes. Both techniques do end up requiring the same amount of space in the intermediate collection though.Hagiarchy
F
10

Another way of finding distinct elements

List<String> uniqueObjects = ImmutableList.of("ABQ","ALB","CHI","CUN","PHX","PUJ","BWI")
            .stream()
            .collect(Collectors.groupingBy((p)->p.substring(0,1))) //expression 
            .values()
            .stream()
            .flatMap(e->e.stream().limit(1))
            .collect(Collectors.toList());
Freitag answered 25/7, 2017 at 14:54 Comment(0)
B
7

A variation on Stuart Marks second update. Using a Set.

public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) {
    Set<Object> seen = Collections.newSetFromMap(new ConcurrentHashMap<>());
    return t -> seen.add(keyExtractor.apply(t));
}
Broadside answered 11/12, 2015 at 10:5 Comment(3)
You can also use Collections.synchronizedSet(new HashSet<>())Odious
Set<Object> seen = ConcurrentHashMap.newKeySet()Faustina
As noted here: #23699871 you could use Function<? super T, ?> keyExtractor as parameter for better compatability.Fellini
C
5

We can also use RxJava (very powerful reactive extension library)

Observable.from(persons).distinct(Person::getName)

or

Observable.from(persons).distinct(p -> p.getName())
Claudie answered 25/6, 2015 at 18:53 Comment(4)
Haha funny you bring this up now, RxJava has been my latest obsession for the last two months.Thorne
RxExtensions are a very great CS idea. IMHO It will become soon a very usefull and ubiquitous tool something like the old regexp but much, much more importantClaudie
I agree, they will be the standard in coming years I'm guessing.Thorne
It's already higly standardized. It's avaliable in almost every language. Behind it there is a smart CS scientist: Erik MeijerClaudie
N
5

To answer your question in your second update:

The only part I'm trying to figure out is how the ConcurrentHashMap maintains only one instance for the entire stream:

public static <T> Predicate<T> distinctByKey(Function<? super T,Object> keyExtractor) {
        Map<Object,Boolean> seen = new ConcurrentHashMap<>();
        return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
    }

In your code sample, distinctByKey is only invoked one time, so the ConcurrentHashMap created just once. Here's an explanation:

The distinctByKey function is just a plain-old function that returns an object, and that object happens to be a Predicate. Keep in mind that a predicate is basically a piece of code that can be evaluated later. To manually evaluate a predicate, you must call a method in the Predicate interface such as test. So, the predicate

t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null

is merely a declaration that is not actually evaluated inside distinctByKey.

The predicate is passed around just like any other object. It is returned and passed into the filter operation, which basically evaluates the predicate repeatedly against each element of the stream by calling test.

I'm sure filter is more complicated than I made it out to be, but the point is, the predicate is evaluated many times outside of distinctByKey. There's nothing special* about distinctByKey; it's just a function that you've called one time, so the ConcurrentHashMap is only created one time.

*Apart from being well made, @stuart-marks :)

Neary answered 30/9, 2015 at 18:23 Comment(1)
If I don't want to do it in parallel, I can use HashMap instead of ConcurrentHashMap, right?Aptitude
D
2

You can use the distinct(HashingStrategy) method in Eclipse Collections.

List<String> list = Lists.mutable.with("ABQ", "ALB", "CHI", "CUN", "PHX", "PUJ", "BWI");
ListIterate.distinct(list, HashingStrategies.fromFunction(s -> s.substring(0, 1)))
    .each(System.out::println);

If you can refactor list to implement an Eclipse Collections interface, you can call the method directly on the list.

MutableList<String> list = Lists.mutable.with("ABQ", "ALB", "CHI", "CUN", "PHX", "PUJ", "BWI");
list.distinct(HashingStrategies.fromFunction(s -> s.substring(0, 1)))
    .each(System.out::println);

HashingStrategy is simply a strategy interface that allows you to define custom implementations of equals and hashcode.

public interface HashingStrategy<E>
{
    int computeHashCode(E object);
    boolean equals(E object1, E object2);
}

Note: I am a committer for Eclipse Collections.

Donate answered 11/1, 2016 at 22:8 Comment(0)
P
0

It can be done something like

Set<String> distinctCompany = orders.stream()
        .map(Order::getCompany)
        .collect(Collectors.toSet());
Peristome answered 29/5, 2017 at 5:18 Comment(0)
P
0

Set.add(element) returns true if the set did not already contain element, otherwise false. So you can do like this.

Set<String> set = new HashSet<>();
BigDecimal totalShare = orders.stream()
    .filter(c -> set.add(c.getCompany().getId()))
    .map(c -> c.getShare())
    .reduce(BigDecimal.ZERO, BigDecimal::add);

If you want to do this parallel, you must use concurrent map.

Piercy answered 30/6, 2017 at 20:0 Comment(1)
The Javadoc warns against using stateful lambdas in streams.Donahoe

© 2022 - 2024 — McMap. All rights reserved.