Partition a Stream by a discriminator function
Asked Answered
C

3

25

One of the missing features in the Streams API is the "partition by" transformation, for example as defined in Clojure. Say I want to reproduce Hibernate's fetch join: I want to issue a single SQL SELECT statement to receive this kind of objects from the result:

class Family {
   String surname;
   List<String> members;
}

I issue:

SELECT f.name, m.name 
FROM Family f JOIN Member m on m.family_id = f.id
ORDER BY f.name

and I retrieve a flat stream of (f.name, m.name) records. Now I need to transform it into a stream of Family objects, with a list of its members inside. Assume I already have a Stream<ResultRow>; now I need to transform it into a Stream<List<ResultRow>> and then act upon that with a mapping transformation which turns it into a Stream<Family>.

The semantics of the transformation are as follows: keep collecting the stream into a List for as long as the provided discriminator function keeps returning the same value; as soon as the value changes, emit the List as an element of the output stream and start collecting a new List.

I hope to be able to write this kind of code (I already have the resultStream method):

Stream<ResultRow> dbStream = resultStream(queryBuilder.createQuery(
        "SELECT f.name, m.name"
      + " FROM Family f JOIN Member m on m.family_id = f.id"
      + " ORDER BY f.name"));
Stream<List<ResultRow> partitioned = partitionBy(r -> r.string(0), dbStream);
Stream<Family> = partitioned.map(rs -> {
                    Family f = new Family(rs.get(0).string(0));
                    f.members = rs.stream().map(r -> r.string(1)).collect(toList());
                    return f;
                 });

Needless to say, I expect the resulting stream to stay lazy (non-materialized) as I want to be able to process a result set of any size without hitting any O(n) memory limits. Without this crucial requirement I would be happy with the provided groupingBy collector.

Compensation answered 6/2, 2015 at 10:12 Comment(2)
I’m not sure but is it possible that this is identical to this problem?Siderite
Yes, I believe it is. I am actually considering to mention the similarity with groupingBy and the key distinction. The solutions provided are quite different, though.Compensation
C
20

The solution requires us to define a custom Spliterator which can be used to construct the partitioned stream. We shall need to access the input stream through its own spliterator and wrap it into ours. The output stream is then constructed from our custom spliterator.

The following Spliterator will turn any Stream<E> into a Stream<List<E>> provided a Function<E, ?> as the discriminator function. Note that the input stream must be ordered for this operation to make sense.

import java.util.*;
import java.util.Spliterators.AbstractSpliterator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static java.util.Comparator.naturalOrder;

public class PartitionBySpliterator<E> extends AbstractSpliterator<List<E>> {
    private final Spliterator<E> spliterator;
    private final Function<? super E, ?> partitionBy;
    private HoldingConsumer<E> holder;
    private Comparator<List<E>> comparator;

    public PartitionBySpliterator(
            Spliterator<E> toWrap,
            Function<? super E, ?> partitionBy
    ) {
        super(Long.MAX_VALUE, toWrap.characteristics() & ~SIZED | NONNULL);
        this.spliterator = toWrap;
        this.partitionBy = partitionBy;
    }

    public static <E> Stream<List<E>> partitionBy(
            Function<E, ?> partitionBy, Stream<E> in
    ) {
        return StreamSupport.stream(
                new PartitionBySpliterator<>(in.spliterator(), partitionBy), false);
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final HoldingConsumer<E> h;
        if (holder == null) {
            h = new HoldingConsumer<>();
            if (!spliterator.tryAdvance(h)) {
              return false;
            }
            holder = h;
        } else {
          h = holder;
        }
        final ArrayList<E> partition = new ArrayList<>();
        final Object partitionKey = partitionBy.apply(h.value);
        boolean didAdvance;
        do {
          partition.add(h.value);
        }
        while ((didAdvance = spliterator.tryAdvance(h))
                && Objects.equals(partitionBy.apply(h.value), partitionKey));
        if (!didAdvance) {
          holder = null;
        }
        action.accept(partition);
        return true;
    }

    static final class HoldingConsumer<T> implements Consumer<T> {
        T value;

        @Override
        public void accept(T value) {
            this.value = value;
        }
    }

    @Override
    public Comparator<? super List<E>> getComparator() {
        final Comparator<List<E>> c = this.comparator;
        return c != null ? c : (this.comparator = comparator());
    }

    private Comparator<List<E>> comparator() {
        @SuppressWarnings({"unchecked", "rawtypes"})
        final Comparator<? super E> innerComparator =
                Optional.ofNullable(spliterator.getComparator())
                        .orElse((Comparator) naturalOrder());
        return (left, right) -> {
            final int c = innerComparator.compare(left.get(0), right.get(0));
            return c != 0 ? c : innerComparator.compare(
                    left.get(left.size() - 1), right.get(right.size() - 1));
        };
    }
}
Compensation answered 6/2, 2015 at 10:12 Comment(17)
Nice! What made you choose Stream<List<E>> rather than Stream<Stream<E>>?Mauri
Well, since that's how I construct it, I find it more general to emit it that way, too. If I emitted a stream and the client just needed the list, i would force him to copy into a new list. Look at my use case: I need special treatment of the first list element, which I fetch before entering the loop over all elements. That would get very awkward if I got a stream.Compensation
In your Family scenario that's fine but when the partition key changes very rarely (happens to me a lot when the query result takes hours to come across and contains millions of records) then a Stream<Stream<E>> solution would be necessary to retain your O(n) memory requirements. I accept that would be much more complicated though - or would it?Mauri
That would break the semantics of the inner stream: what if you issued partitionedStream.skip(1)...? In general terms, accessing any element of the stream-of-streams would require the full consumption (and retention!) of all its predecessors.Compensation
It is not correct to simply pass the source’s characteristics() to your own spliterator. E.g. if the source is SIZED it does not imply that your spliterator is as well, to be exact, it’s clear that your spliterator is not SIZED. On the other hand, your spliterator may report NONNULL even if the source doesn’t. You may retain ORDERED safely and even SORTED if you provide an appropriate Comparator which delegates to the source’s Comparator.Siderite
@Siderite I've written out the comparator, but honestly, I don't see the point of doing that. SORTED with getComparator() returning null makes clear sense, but here I'm pulling a comparator out of my finger, second-guessing the sorting order emergent from the chunking. Do you know a case where this will be of any help? Maybe if the comparator was a singleton for the spliterator instance it could help?Compensation
I don’t think that there is anything in the current implementation gaining benefit of this. However, who knows what the future will be… I just wanted to say that you can retain a SORTED flag but due to the contract you have to provide a valid comparator. That wouldn’t be guesswork; if the source spliterator reports SORTED it has to provide an appropriate comparator and if you retain the order, you know that there is a total ordering between the lists due to their contained elements so all your comparator has to do is to compare two arbitrary elements of the lists using source comparator…Siderite
@Siderite Not exactly any arbitrary elements, but perhaps just the first one of the left list with the last one of the right list. Consider the worst case: all elements in both list the same except the last one of the second list. Or except the first one in the first list.Compensation
Right. I forgot that SORTED doesn’t automatically imply DISTINCTSiderite
@Siderite DISTINCT wouldn't cover it, either, as that would just imply distinctness by the equality relation. They could still sort the same, even by natural order.Compensation
You are right. An alternative to your solution would be: Function<List<E>,E> first=l->l.get(0), last=l->l.get(l.size()-1); return Comparator.comparing(first, innerComparator) .thenComparing( Comparator.comparing(last, innerComparator));Siderite
@Siderite Nice work! Just one detail: comparing(first, innerComparator).thenComparing(last, innerComparator) is cleaner.Compensation
@Siderite Also, in the style of JDK code I'm worried about performance, which is why I dropped several FP-esque idioms of my own, in favor the most direct code, at least within the comparator method, which will be called many times.Compensation
Thanks for this, it's been really useful for me! I've used it for splitting up a stream of lines read from an input csv file, where I know the file is sorted by a certain value.Detector
Could you replace the last 3 lines of the tryAdvance method, ie if (!didAdvance) holder = null; action.accept(partition); return true; with these two lines: action.accept(partition); return didAdvance; ?Detector
@Detector Did you try it? I expect two bugs: the return value of didAdvance would be wrong on the last advancement, and each subsequent call of tryAdvance() would fail with NPE. The latter is actually up to the partitioning function, but since it can otherwise assume a non-null argument, it is highly likely to fail when passed null.Compensation
@MarkoTopolnik You're totally right, I tried it and it was messing up my answers, but not in an obvious way... (I didn't get the NPE because I changed the instantiation of the holder.) It meant that tryAdvance was breaking the Interface contract because it returned false even when it had successfully consumed stuff. Thanks for your reply!Detector
M
2

For those of you who just want to partition a stream, there are mappers and collectors for that.

class Person {

    String surname;
    String forename;

    public Person(String surname, String forename) {
        this.surname = surname;
        this.forename = forename;
    }

    @Override
    public String toString() {
        return forename;
    }

}

class Family {

    String surname;
    List<Person> members;

    public Family(String surname, List<Person> members) {
        this.surname = surname;
        this.members = members;
    }

    @Override
    public String toString() {
        return "Family{" + "surname=" + surname + ", members=" + members + '}';
    }

}

private void test() {
    String[][] data = {
        {"Kray", "Ronald"},
        {"Kray", "Reginald"},
        {"Dors", "Diana"},};
    // Their families.
    Stream<Family> families = Arrays.stream(data)
            // Build people
            .map(a -> new Person(a[0], a[1]))
            // Collect into a Map<String,List<Person>> as families
            .collect(Collectors.groupingBy(p -> p.surname))
            // Convert them to families.
            .entrySet().stream()
            .map(p -> new Family(p.getKey(), p.getValue()));
    families.forEach(f -> System.out.println(f));
}
Mauri answered 27/8, 2015 at 15:11 Comment(1)
the data in your stream is already partitioned correctly through the sub-arrays. What Marco is talking about is a completely flat stream. Basically the data would be a String[]. That makes your solution work well on an already partitioned / grouped Stream but unfortunately unusable for a completely flat stream :/Bedside
C
2

It can be done by collapse with StreamEx

StreamEx.of(queryBuilder.createQuery(
    "SELECT f.name, m.name"
    + " FROM Family f JOIN Member m on m.family_id = f.id"
    + " ORDER BY f.name"))
        .collapse((a, b) -> a.string(0).equals(b.string(0)), Collectors.toList())
        .map(l -> new Family(l.get(0).string(0), StreamEx.of(l).map(r -> r.string(1)).toList())) 
        .forEach(System.out::println);
Choline answered 12/6, 2017 at 20:5 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.