Collect successive pairs from a stream
Asked Answered
C

23

121

Given an object or primitive stream such as { 0, 1, 2, 3, 4 }, how can I most elegantly transform it into given form (assuming, of course, I've defined class Pair)?

{ new Pair(0, 1), new Pair(1, 2), new Pair(2, 3), new Pair(3, 4) }

Caldera answered 9/12, 2013 at 11:55 Comment(5)
The term from FP is "partition", but I'm not finding a method with the desired semantics in Java. It has partitioning on a predicate.Inlier
Typically the spliterator in JDK 8 is thought for traversing and partitioning purposes. I will try to come up with an example also.Synn
list.stream().map(i -> new Pair(i, i+1));Carbone
For the equivalent non streams question, see #17453522Vassal
By the way, some folks use either implementation of Map.Entry as a Pair class. (Granted, some might consider that a hack, but using a built-in class is handy.)Oft
A
3

There is built-in support for this in the JEP 461: Stream Gatherers Java 22 preview language feature:

Stream.of(0, 1, 2, 3, 4)
        .gather(Gatherers.windowSliding(2))
        .map(list -> new Pair(list.get(0), list.get(1)))
        .toList();

This uses the new Stream.gather method with the new built-in Gatherers.windowSliding gatherer to convert the initial Stream<Integer> ([0, 1, 2, 3, 4]) to a pairwise Stream<List<Integer>> ([[0, 1], [1, 2], [2, 3], [3, 4]]). Each of these lists is then transformed to a Pair using the existing Stream.map method.

Javadocs

Gatherer:

An intermediate operation that transforms a stream of input elements into a stream of output elements, optionally applying a final action when the end of the upstream is reached. […]

[…]

There are many examples of gathering operations, including but not limited to: grouping elements into batches (windowing functions); de-duplicating consecutively similar elements; incremental accumulation functions (prefix scan); incremental reordering functions, etc. The class Gatherers provides implementations of common gathering operations.

Stream.gather:

Returns a stream consisting of the results of applying the given gatherer to the elements of this stream.

Gatherers.windowSliding

Returns a Gatherer that gathers elements into windows -- encounter-ordered groups of elements -- of a given size, where each subsequent window includes all elements of the previous window except for the least recent, and adds the next element in the stream. […]

Example:

// will contain: [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]
List<List<Integer>> windows2 =
    Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(2)).toList();

// will contain: [[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4, 5, 6, 7, 8]]
List<List<Integer>> windows6 =
    Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(6)).toList();
Alston answered 21/12, 2023 at 18:7 Comment(0)
C
84

The Java 8 streams library is primarily geared toward splitting streams into smaller chunks for parallel processing, so stateful pipeline stages are quite limited, and doing things like getting the index of the current stream element and accessing adjacent stream elements are not supported.

A typical way to solve these problems, with some limitations, of course, is to drive the stream by indexes and rely on having the values being processed in some random-access data structure like an ArrayList from which the elements can be retrieved. If the values were in arrayList, one could generate the pairs as requested by doing something like this:

    IntStream.range(1, arrayList.size())
             .mapToObj(i -> new Pair(arrayList.get(i-1), arrayList.get(i)))
             .forEach(System.out::println);

Of course the limitation is that the input cannot be an infinite stream. This pipeline can be run in parallel, though.

Chamblee answered 11/12, 2013 at 0:7 Comment(2)
"The input cannot be an infinite stream." Actually, the input cannot be a stream at all. The input (arrayList) is in fact a collection, which is why I didn't mark it as the answer. (But congrats on your gold badge!)Caldera
is there a way in streams to conditionally skip next iteration i.e. increment forEach or mapToObj index to i+2 instead of i+1? Is that is not a recommended use case for streams or functional programming in java?Lashandralashar
W
39

My StreamEx library which extends standard streams provides a pairMap method for all stream types. For primitive streams it does not change the stream type, but can be used to make some calculations. Most common usage is to calculate differences:

int[] pairwiseDiffs = IntStreamEx.of(input).pairMap((a, b) -> (b-a)).toArray();

For object stream you can create any other object type. My library does not provide any new user-visible data structures like Pair (that's the part of library concept). However if you have your own Pair class and want to use it, you can do the following:

Stream<Pair> pairs = IntStreamEx.of(input).boxed().pairMap(Pair::new);

Or if you already have some Stream:

Stream<Pair> pairs = StreamEx.of(stream).pairMap(Pair::new);

This functionality is implemented using custom spliterator. It has quite low overhead and can parallelize nicely. Of course it works with any stream source, not just random access list/array like many other solutions. In many tests it performs really well. Here's a JMH benchmark where we find all input values preceding a larger value using different approaches (see this question).

Wonacott answered 30/5, 2015 at 3:49 Comment(5)
Thank you! The more I study this library, the more I love it. I might finally start using streams. (StreamEx implements Iterable! Hurrah!)Caldera
To make your answer 100% complete, could you show how to wrap a Stream into a StreamEx?Caldera
@AleksandrDubinsky: just use StreamEx.of(stream). There are other convenient static methods to create the stream from Collection, array, Reader, etc. Edited the answer.Wonacott
@TagirValeev is pairMap ordered on sequental streams? Actually, I would like to have forPairsOrdered(), but as there is no such method, can I simulate it somehow? stream.ordered().forPairs() or stream().pairMap().forEachOrdered()?Ghoul
@AskarKalykov, the pairMap is the intermediate operation with non-interfering stateless mapper function, the ordering is not specified for it in the same way as for simple map. The forPairs is unordered by specification, but unordered operations are de-facto ordered for sequential streams. It would be nice if you formulate your original problem as separate stackoverflow question to provide more context.Wonacott
P
23

You can do this with the Stream.reduce() method (I haven't seen any other answers using this technique).

public static <T> List<Pair<T, T>> consecutive(List<T> list) {
    List<Pair<T, T>> pairs = new LinkedList<>();
    list.stream().reduce((a, b) -> {
        pairs.add(new Pair<>(a, b));
        return b;
    });
    return pairs;
}
Pusillanimity answered 22/3, 2017 at 17:41 Comment(6)
It would return (1,2) (2,3) instead of (1,2) (3,4). Also I'm not sure if it would be applied in order (certainly there's no guarantee of that).Caldera
Please check the question, that is the intended behaviour @Aleksandr DubinskyPusillanimity
Ahh, yes, sorry. And to think, I wrote it.Caldera
The order seems to be guaranted according to Is .collect guaranteed to be ordered on parallel streams? and How to ensure order of processing in java8 streams?Ineluctable
This is a very smart idea! The only problem I would see in this approach is that the reduction function is not pure (it depends on the external pairs object). Therefore, if ran concurrently, its semantic correctness is not ensured. One possible solution would be to use a thread-safe data structure, such as Vector.Albinus
I like the idea, except for the construction of a new list, which is necessary because reduce is a terminal operation. Should be able to do this lazily.Quianaquibble
P
18

This is not elegant, it's a hackish solution, but works for infinite streams

Stream<Pair> pairStream = Stream.iterate(0, (i) -> i + 1).map( // natural numbers
    new Function<Integer, Pair>() {
        Integer previous;

        @Override
        public Pair apply(Integer integer) {
            Pair pair = null;
            if (previous != null) pair = new Pair(previous, integer);
            previous = integer;
            return pair;
        }
    }).skip(1); // drop first null

Now you can limit your stream to the length you want

pairStream.limit(1_000_000).forEach(i -> System.out.println(i));

P.S. I hope there is better solution, something like clojure (partition 2 1 stream)

Pope answered 9/12, 2013 at 15:4 Comment(10)
Kudos for pointing out that anonymous classes are a sometimes useful alternative to lambdas.Caldera
how will this deal with parallel streams?Carbone
@Carbone I assume it won't work correctly. According to the parallelStream doc: "To preserve correct behavior, these behavioral parameters must be non-interfering, and in most cases must be stateless"Pope
This is completely contrary to the design of the streams framework and directly violates the contract of the map API, as the anonymous function is not stateless. Try running this with a parallel stream and more data so the stream framework creates more working threads, and you will see the result: infrequent random "errors" almost impossible to reproduce and difficult to detect until you have data enough (in production?). This can be disastrous.Vincenza
@MarioRossi The Streams framework does not exist to only write parallel code. Its butt, unfortunately, sits on two sides of the fence, and many programmers use it to write sequential code. There are even built-in methods that cannot be parallelized (such as skip).Caldera
@AleksandrDubinsky You are incorrect about limit/skip being parallelizable; the implementation provided in JDK does in fact work in parallel. Because the operation is tied to encounter order, parallelization may not always provide a performance benefit, but in high-Q situations, it can.Rebuff
@BrianGoetz Last time I checked, parallel skip would skip a random element. I'd needed to use forEachOrdered (instead offorEach) as the terminal operation, which made my short pipeline serial. There was no facility to say "I need this part of the pipeline ordered, and this part parallel." (The API preposterously ignores the ordering of calls to parallel, ordered, unordered, and forEach.) I concede an ordered, parallel pipeline that terminates in a reduction could see correct behavior from skip. But I've given up on Streams and don't use them, especially for parallelism.Caldera
@AleksandrDubinsky Incorrect. It may skip a random element if the stream is unordered (has no defined encounter order, so logically there is no "first" or "nth" element, just elements.) But whether the stream is ordered or unordered, skip has always been able to work in parallel. There's just less parallelism to extract if the stream is ordered, but its still parallel.Rebuff
@BrianGoetz I do not want to waste the time of someone of your stature. You are right. (I attempted to admit that in my last comment, oh well.) Streams suck nevertheless. (Useless, random, surprising behavior instead of an exception?) Unfortunately, I cannot edit my original comment, except to delete it and re-write it.Caldera
You can eliminate the parallel issues with skip by replacing it with filter(p -> p != null). That doesn't take care of the issues with using a stateful mapper, but still.Chaplain
K
17

I've implemented a spliterator wrapper which takes every n elements T from the original spliterator and produces List<T>:

public class ConsecutiveSpliterator<T> implements Spliterator<List<T>> {

    private final Spliterator<T> wrappedSpliterator;

    private final int n;

    private final Deque<T> deque;

    private final Consumer<T> dequeConsumer;

    public ConsecutiveSpliterator(Spliterator<T> wrappedSpliterator, int n) {
        this.wrappedSpliterator = wrappedSpliterator;
        this.n = n;
        this.deque = new ArrayDeque<>();
        this.dequeConsumer = deque::addLast;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<T>> action) {
        deque.pollFirst();
        fillDeque();
        if (deque.size() == n) {
            List<T> list = new ArrayList<>(deque);
            action.accept(list);
            return true;
        } else {
            return false;
        }
    }

    private void fillDeque() {
        while (deque.size() < n && wrappedSpliterator.tryAdvance(dequeConsumer))
            ;
    }

    @Override
    public Spliterator<List<T>> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return wrappedSpliterator.estimateSize();
    }

    @Override
    public int characteristics() {
        return wrappedSpliterator.characteristics();
    }
}

Following method may be used to create a consecutive stream:

public <E> Stream<List<E>> consecutiveStream(Stream<E> stream, int n) {
    Spliterator<E> spliterator = stream.spliterator();
    Spliterator<List<E>> wrapper = new ConsecutiveSpliterator<>(spliterator, n);
    return StreamSupport.stream(wrapper, false);
}

Sample usage:

consecutiveStream(Stream.of(0, 1, 2, 3, 4, 5), 2)
    .map(list -> new Pair(list.get(0), list.get(1)))
    .forEach(System.out::println);
Karalee answered 9/12, 2013 at 18:11 Comment(8)
Does that repeat every element twice?Caldera
Nope. It creates a new stream containing List<E> elements. Each list contains n consecutive elements from the original stream. Check it yourself ;)Madisonmadlen
Could you modify your answer so that every element (except the first and last) is repeated?Caldera
Sorry, I didn't notice that elements should be repeated. I've fixed my solution.Madisonmadlen
+1 I think this is good work and should be generalized to any step size in addition to the partition size. There is a lot of need for a (partition size step) function and this is about the best way to get it.Inlier
Consider using ArrayDeque for performance, in preference to LinkedList.Inlier
The value assigned to dequeConsumer (new Consumer<T>() ...) is somewhat unnecessarily wordy for Java 8. It can be replaced with deque::addLast which is a lot clearer.Matteo
If step parameter should be also taken into account, just replace deque.pollFirst(); statement with for (int i = 0; i < step && deque.pollFirst() != null; ++i) {} loopTendency
A
8

You can do this in cyclops-react (I contribute to this library), using the sliding operator.

  LazyFutureStream.of( 0, 1, 2, 3, 4 )
                  .sliding(2)
                  .map(Pair::new);

Or

   ReactiveSeq.of( 0, 1, 2, 3, 4 )
                  .sliding(2)
                  .map(Pair::new);

Assuming the Pair constructor can accept a Collection with 2 elements.

If you wanted to group by 4, and increment by 2 that is also supported.

     ReactiveSeq.rangeLong( 0L,Long.MAX_VALUE)
                .sliding(4,2)
                .forEach(System.out::println);

Equivalant static methods for creating a sliding view over java.util.stream.Stream are also provided in cyclops-streams StreamUtils class.

       StreamUtils.sliding(Stream.of(1,2,3,4),2)
                  .map(Pair::new);

Note :- for single-threaded operation ReactiveSeq would be more appropriate. LazyFutureStream extends ReactiveSeq but is primarily geared for concurrent / parallel use (it is a Stream of Futures).

LazyFutureStream extends ReactiveSeq which extends Seq from the awesome jOOλ (which extends java.util.stream.Stream), so the solutions Lukas' presents would also work with either Stream type. For anyone interested the primary differences between the window / sliding operators are the obvious relative power / complexity trade off and suitability for use with infinite streams (sliding doesn't consume the stream, but buffers as it flows).

Abyss answered 29/3, 2015 at 20:54 Comment(2)
This way you obtain [(0,1)(2,3) ...] but the question asks [(0,1)(1,2) ...]. Please see my answer with RxJava...Inspirational
You are right, my bad, I misread the question - the sliding operator is the correct one to use here. I'll update my answer - thanks!Abyss
R
6

Streams.zip(..) is available in Guava, for those who depend on it.

Example:

Streams.zip(list.stream(),
            list.stream().skip(1),
            (a, b) -> System.out.printf("%s %s\n", a, b));
Rawdan answered 7/9, 2019 at 2:59 Comment(0)
C
5

The proton-pack library provides the windowed functionnality. Given a Pair class and a Stream, you can do it like this:

Stream<Integer> st = Stream.iterate(0 , x -> x + 1);
Stream<Pair<Integer, Integer>> pairs = StreamUtils.windowed(st, 2, 1)
                                                  .map(l -> new Pair<>(l.get(0), l.get(1)))
                                                  .moreStreamOps(...);

Now the pairs stream contains:

(0, 1)
(1, 2)
(2, 3)
(3, 4)
(4, ...) and so on
Carhart answered 4/3, 2015 at 15:44 Comment(4)
However, it looks like you need to create st twice! Can this library solve the problem using a single stream?Caldera
@AleksandrDubinsky I don't think so it's available with the current spliterators. I submitted an issue github.com/poetix/protonpack/issues/9Carhart
@AleksandrDubinsky The windowed functionality has been added! See the edit.Carhart
Why don't you delete your old answer, so that other users can see the solution, not history.Caldera
H
5

Finding successive pairs

If you're willing to use a third party library and don't need parallelism, then jOOλ offers SQL-style window functions as follows

System.out.println(
Seq.of(0, 1, 2, 3, 4)
   .window()
   .filter(w -> w.lead().isPresent())
   .map(w -> tuple(w.value(), w.lead().get())) // alternatively, use your new Pair() class
   .toList()
);

Yielding

[(0, 1), (1, 2), (2, 3), (3, 4)]

The lead() function accesses the next value in traversal order from the window.

Finding successive triples / quadruples / n-tuples

A question in the comments was asking for a more general solution, where not pairs but n-tuples (or possibly lists) should be collected. Here's thus an alternative approach:

int n = 3;

System.out.println(
Seq.of(0, 1, 2, 3, 4)
   .window(0, n - 1)
   .filter(w -> w.count() == n)
   .map(w -> w.window().toList())
   .toList()
);

Yielding a list of lists

[[0, 1, 2], [1, 2, 3], [2, 3, 4]]

Without the filter(w -> w.count() == n), the result would be

[[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]]

Disclaimer: I work for the company behind jOOλ

Hoiden answered 7/1, 2016 at 0:22 Comment(4)
Interesting. What if I need to group 3 or more elements? Use w.lead().lead()?Intercourse
@RaulSantelices: tuple(w.value(), w.lead(1), w.lead(2)) would be an option. I've updated my answer with a more generic solution for length = nHoiden
I understand correctly that .window() is not lazy operation which collects the whole input stream into some intermediate collection, then creates a new stream from it?Wonacott
@TagirValeev: Yes, that's the current implementation. In the above case (no Comparator is used to reorder windows), then an optimisation like this would be possible, and is likely to be implemented in the future.Hoiden
V
3

The operation is essentially stateful so not really what streams are meant to solve - see the "Stateless Behaviors" section in the javadoc:

The best approach is to avoid stateful behavioral parameters to stream operations entirely

One solution here is to introduce state in your stream through an external counter, although it will only work with a sequential stream.

public static void main(String[] args) {
    Stream<String> strings = Stream.of("a", "b", "c", "c");
    AtomicReference<String> previous = new AtomicReference<>();
    List<Pair> collect = strings.map(n -> {
                            String p = previous.getAndSet(n);
                            return p == null ? null : new Pair(p, n);
                        })
                        .filter(p -> p != null)
                        .collect(toList());
    System.out.println(collect);
}


static class Pair<T> {
    private T left, right;
    Pair(T left, T right) { this.left = left; this.right = right; }
    @Override public String toString() { return "{" + left + "," + right + '}'; }
}
Vigen answered 11/12, 2013 at 1:35 Comment(5)
The question asks to collect successive elements of an input stream, not merely to collect successive integers. An important clarification of terminology: Stream != "lambdas".Caldera
You could replace AtomicInteger by an AtomicReference. The alternative is to roll your own collector or use external libraries such as in this example: https://mcmap.net/q/56575/-how-to-use-java-8-streams-to-find-all-values-preceding-a-larger-valueVigen
See my edit. Also I'm not sure I understand your comment on lambda != stream. The other answer that uses an anonymous class does essentially the same thing except that the state is held by the anonymous class instead of being external...Vigen
That works. The StreamEx library is also a good find and could be an answer in itself. My comment on "streams != lambdas" refers to you stating "The operation is essentially stateful so not really what lambdas are meant to solve." I think you meant to use the word "streams".Caldera
Oh I see - I've clarified that.Vigen
A
3

There is built-in support for this in the JEP 461: Stream Gatherers Java 22 preview language feature:

Stream.of(0, 1, 2, 3, 4)
        .gather(Gatherers.windowSliding(2))
        .map(list -> new Pair(list.get(0), list.get(1)))
        .toList();

This uses the new Stream.gather method with the new built-in Gatherers.windowSliding gatherer to convert the initial Stream<Integer> ([0, 1, 2, 3, 4]) to a pairwise Stream<List<Integer>> ([[0, 1], [1, 2], [2, 3], [3, 4]]). Each of these lists is then transformed to a Pair using the existing Stream.map method.

Javadocs

Gatherer:

An intermediate operation that transforms a stream of input elements into a stream of output elements, optionally applying a final action when the end of the upstream is reached. […]

[…]

There are many examples of gathering operations, including but not limited to: grouping elements into batches (windowing functions); de-duplicating consecutively similar elements; incremental accumulation functions (prefix scan); incremental reordering functions, etc. The class Gatherers provides implementations of common gathering operations.

Stream.gather:

Returns a stream consisting of the results of applying the given gatherer to the elements of this stream.

Gatherers.windowSliding

Returns a Gatherer that gathers elements into windows -- encounter-ordered groups of elements -- of a given size, where each subsequent window includes all elements of the previous window except for the least recent, and adds the next element in the stream. […]

Example:

// will contain: [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]
List<List<Integer>> windows2 =
    Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(2)).toList();

// will contain: [[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4, 5, 6, 7, 8]]
List<List<Integer>> windows6 =
    Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(6)).toList();
Alston answered 21/12, 2023 at 18:7 Comment(0)
I
2

We can use RxJava (very powerful reactive extension library)

IntStream intStream  = IntStream.iterate(1, n -> n + 1);

Observable<List<Integer>> pairObservable = Observable.from(intStream::iterator).buffer(2,1);

pairObservable.take(10).forEach(b -> {
            b.forEach(n -> System.out.println(n));
            System.out.println();
        });

The buffer operator transforms an Observable that emits items into an Observable that emits buffered collections of those items..

Inspirational answered 9/1, 2016 at 16:45 Comment(1)
I used Observable.zip(obs, obs.skip(1), pair->{...}) up until now! I didn't know Observable.buffer had a version with a step (and I'm used to the zip trick from python). +1Windbroken
G
2

You could use Flux:

Stream<String> someStream = Stream.of("A", "B", "C", "D");
Flux<String> someFlux = Flux.fromStream(someStream);

someFlux.zipWith(someFlux.skip(1))
    .map(t -> t.getT1().concat(t.getT2()))
    .subscribe(System.out::println);

The result would be:

AB
BC
CD
Gimbals answered 3/1, 2023 at 11:12 Comment(0)
M
1

I finally figured out a way of tricking the Stream.reduce to be able to neatly deal with pairs of values; there are a multitude of use cases that require this facility which does not appear naturally in JDK 8:

public static int ArithGeo(int[] arr) {
    //Geometric
    List<Integer> diffList = new ArrayList<>();
    List<Integer> divList = new ArrayList<>();
    Arrays.stream(arr).reduce((left, right) -> {
        diffList.add(right-left);
        divList.add(right/left);
        return right;
    });
    //Arithmetic
    if(diffList.stream().distinct().count() == 1) {
        return 1;
    }
    //Geometric
    if(divList.stream().distinct().count() == 1) {
        return 2;
    }
    return -1;
}

The trick i use is the return right; statement.

Melaniamelanic answered 3/4, 2019 at 15:2 Comment(3)
I don't think reduce makes sufficient guarantees for this to work.Caldera
Would be interested to know more about the sufficient guarantees. Can you please elaborate? Maybe there is an alternative in Guava...but I am constrained and cannot use it.Melaniamelanic
Reduce works like a charm for paired filtering.Iridis
Q
1

The solutions here seem a little complicated or depend on third-party libraries. This problem can be solved with an intermediate stream which collect pairs:

public static <T> Stream<List<T>> pairs(Stream<T> stream) {
    Iterator<T> iterator = stream.iterator();
    if (iterator.hasNext()) {
        T first = iterator.next();
        if (iterator.hasNext()) {
            return Stream.iterate(
                List.of(first, iterator.next()),
                prev -> iterator.hasNext() ? List.of(prev.get(1), iterator.next()) : null)
                .takeWhile(prev -> prev != null);
        }
    }
    return Stream.empty();
}

Examples:

pairs(Stream.of()).toList();      // []
pairs(Stream.of(1)).toList();     // []
pairs(Stream.of(1,2)).toList();   // [[1, 2]]
pairs(Stream.of(1,2,3)).toList(); // [[1, 2], [2, 3]]
pairs(Stream.of("a","b","c","d")).toList();  // [[a, b], [b, c], [c, d]]

In this solution, Stream.iterateis using an accumulator in much the same way as reduce, except it creates an intermediate stream rather than being a terminal operation. So laziness and infinite streams are supported.

Quianaquibble answered 24/6, 2022 at 8:27 Comment(0)
R
0

In your case, I would write my custom IntFunction which keeps track of the last int passed and use that to map the original IntStream.

import java.util.function.IntFunction;
import java.util.stream.IntStream;

public class PairFunction implements IntFunction<PairFunction.Pair> {

  public static class Pair {

    private final int first;
    private final int second;

    public Pair(int first, int second) {
      this.first = first;
      this.second = second;
    }

    @Override
    public String toString() {
      return "[" + first + "|" + second + "]";
    }
  }

  private int last;
  private boolean first = true;

  @Override
  public Pair apply(int value) {
    Pair pair = !first ? new Pair(last, value) : null;
    last = value;
    first = false;
    return pair;
  }

  public static void main(String[] args) {

    IntStream intStream = IntStream.of(0, 1, 2, 3, 4);
    final PairFunction pairFunction = new PairFunction();
    intStream.mapToObj(pairFunction)
        .filter(p -> p != null) // filter out the null
        .forEach(System.out::println); // display each Pair

  }

}
Repellent answered 9/12, 2013 at 15:9 Comment(7)
Problem with this is it throws statelessness out the window.Corgi
@Corgi and what's the problem with that?Caldera
One of the main points of lambda is to not have mutable state so that the internal integrators can parallelism the work.Corgi
@Rob: Yeah, you are right, but the given example stream defies parallelism anyway as each item (except the first and last ones) is used as a first and a second item of some pair.Repellent
@Repellent yeah I figured that's what you were thinking. I wonder though if there is not a way to do this with some other mapper. In essence all you'd need would be the equivalent of making the loop incrementer go by twos then have the functor take 2 arguments. That must be possible.Corgi
Wait that would not work because the result pairs overlap. Of course you could have the non overlapping pairs made then make the overlapping ones from the created ones. But that would be mutlipass. Would be interesting to see how that would perform.Corgi
@Corgi Prematurely optimize much? I don't get the whole emphasis on parallelism with streams. Parallelism is for special situations only. Besides, you can just put .parallel() after the map(byPairs).Caldera
R
0

An elegant solution would be to use zip. Something like:

List<Integer> input = Arrays.asList(0, 1, 2, 3, 4);
Stream<Pair> pairStream = Streams.zip(input.stream(),
                                      input.stream().substream(1),
                                      (a, b) -> new Pair(a, b)
);

This is pretty concise and elegant, however it uses a list as an input. An infinite stream source cannot be processed this way.

Another (lot more troublesome) issue is that zip together with the entire Streams class has been lately removed from the API. The above code only works with b95 or older releases. So with the latest JDK I would say there is no elegant FP style solution and right now we can just hope that in some way zip will be reintroduced to the API.

Retired answered 10/12, 2013 at 22:55 Comment(4)
Indeed, zip was removed. I don't remember all of what was on the Streams class, but some things have migrated to be static methods on the Stream interface, and there are also StreamSupport and Stream.Builder classes.Chamblee
That's right. Some other methods like concat or iterate has been moved and became default methods in Stream. Sadly zip was just removed from the API. I understand the reasons behind this choice (e.g. lack of Tuples) but still it was a nice feature.Retired
@Retired What do tuples have to do with zip? Whatever pedantic reason might be invented does not justify killing zip.Caldera
@AleksandrDubinsky In most cases zip is used to produce a collection of Pairs/Tuples as an output. They argued that if they kept zip people would ask for Tuples as part of the JDK as well. I would have never removed an existing feature though.Retired
B
0

For calculating successive differences in the time (x-values) of a time-series, I use the stream's collect(...) method:

final List< Long > intervals = timeSeries.data().stream()
                    .map( TimeSeries.Datum::x )
                    .collect( DifferenceCollector::new, DifferenceCollector::accept, DifferenceCollector::combine )
                    .intervals();

Where the DifferenceCollector is something like this:

public class DifferenceCollector implements LongConsumer
{
    private final List< Long > intervals = new ArrayList<>();
    private Long lastTime;

    @Override
    public void accept( final long time )
    {
        if( Objects.isNull( lastTime ) )
        {
            lastTime = time;
        }
        else
        {
            intervals.add( time - lastTime );
            lastTime = time;
        }
    }

    public void combine( final DifferenceCollector other )
    {
        intervals.addAll( other.intervals );
        lastTime = other.lastTime;
    }

    public List< Long > intervals()
    {
        return intervals;
    }
}

You could probably modify this to suit your needs.

Bluster answered 22/6, 2017 at 17:31 Comment(0)
H
-1

This is an interesting problem. Is my hybrid attempt below any good?

public static void main(String[] args) {
    List<Integer> list = Arrays.asList(1, 2, 3);
    Iterator<Integer> first = list.iterator();
    first.next();
    if (first.hasNext())
        list.stream()
        .skip(1)
        .map(v -> new Pair(first.next(), v))
        .forEach(System.out::println);
}

I believe it does not lend itself to parallel processing, and hence may be disqualified.

Hirsutism answered 2/3, 2016 at 22:5 Comment(1)
The question didn't ask for parallel processing, but it did assume that we only have a Stream, not a List. Of course, we can pry an iterator from a Stream as well, so this might be a valid solution. Nevertheless, it's an original approach.Caldera
I
-1

As others have observed, there is, due to the nature of the problem, some statefulness required.

I was faced with a similar problem, in which I wanted what was essentially the Oracle SQL function LEAD. My attempt to implement that is below.

/**
 * Stream that pairs each element in the stream with the next subsequent element.
 * The final pair will have only the first item, the second will be null.
 */
<T> Spliterator<Pair<T>> lead(final Stream<T> stream)
{
    final Iterator<T> input = stream.sequential().iterator();

    final Iterable<Pair<T>> iterable = () ->
    {
        return new Iterator<Pair<T>>()
        {
            Optional<T> current = getOptionalNext(input);

            @Override
            public boolean hasNext()
            {
                return current.isPresent();
            }

            @Override
            public Pair<T> next()
            {
                Optional<T> next = getOptionalNext(input);
                final Pair<T> pair = next.isPresent()
                    ? new Pair(current.get(), next.get())
                    : new Pair(current.get(), null);
                current = next;

                return pair;
            }
        };
    };

    return iterable.spliterator();
}

private <T> Optional<T> getOptionalNext(final Iterator<T> iterator)
{
    return iterator.hasNext()
        ? Optional.of(iterator.next())
        : Optional.empty();
}
Indent answered 19/10, 2016 at 21:17 Comment(0)
P
-1

You can achieve that by using a bounded queue to store elements which flows through the stream (which is basing on the idea which I described in detail here: Is it possible to get next element in the Stream?)

Belows example first defines instance of BoundedQueue class which will store elements going through the stream (if you don't like idea of extending the LinkedList, refer to link mentioned above for alternative and more generic approach). Later you just combine two subsequent elements into instance of Pair:

public class TwoSubsequentElems {
  public static void main(String[] args) {
    List<Integer> input = new ArrayList<Integer>(asList(0, 1, 2, 3, 4));

    class BoundedQueue<T> extends LinkedList<T> {
      public BoundedQueue<T> save(T curElem) {
        if (size() == 2) { // we need to know only two subsequent elements
          pollLast(); // remove last to keep only requested number of elements
        }

        offerFirst(curElem);

        return this;
      }

      public T getPrevious() {
        return (size() < 2) ? null : getLast();
      }

      public T getCurrent() {
        return (size() == 0) ? null : getFirst();
      }
    }

    BoundedQueue<Integer> streamHistory = new BoundedQueue<Integer>();

    final List<Pair<Integer>> answer = input.stream()
      .map(i -> streamHistory.save(i))
      .filter(e -> e.getPrevious() != null)
      .map(e -> new Pair<Integer>(e.getPrevious(), e.getCurrent()))
      .collect(Collectors.toList());

    answer.forEach(System.out::println);
  }
}
Puttyroot answered 8/11, 2016 at 12:20 Comment(0)
W
-3

I agree with @aepurniet but instead map you have to use mapToObj

range(0, 100).mapToObj((i) -> new Pair(i, i+1)).forEach(System.out::println);
Wetmore answered 28/5, 2015 at 6:21 Comment(2)
Right. But this simply collects pairs of integers, not pairs of elements of a stream (of any type).Caldera
It doesn't work for collection eg. fibonacci 1, 2, 3, 5, 8, 13...Fornof
A
-6

Run a for loop that runs from 0 to length-1 of your stream

for(int i = 0 ; i < stream.length-1 ; i++)
{
    Pair pair = new Pair(stream[i], stream[i+1]);
    // then add your pair to an array
}
Agler answered 9/12, 2013 at 12:11 Comment(4)
and where is the lambda part of the solution?Synn
It is not the case when stream is infinitePope
@Olimpiu - Where did you get a lambda was a requirement? I read the question twice to make sure I was not missing it. I also checked history of edits. And the question is not tagged with it.Flashing
It should be deletedFornof

© 2022 - 2024 — McMap. All rights reserved.