How do I lazily concatenate streams?
Asked Answered
S

3

8

I'm trying to implement a stream that uses another instance of itself in its implementation. The stream has a few constant elements prepended (with IntStream.concat) to it, so this should work as long as the concatenated stream creates the non-constant part lazily. I think using the StreamSupport.intStream overload taking a Supplier with IntStream.concat (which "creates a lazily concatenated stream") should be lazy enough to only create the second spliterator when elements are demanded from it, but even creating the stream (not evaluating it) overflows the stack. How can I lazily concatenate streams?


I'm attempting to port the streaming prime number sieve from this answer into Java. This sieve uses another instance of itself (ps = postponed_sieve() in the Python code). If I break the initial four constant elements (yield 2; yield 3; yield 5; yield 7;) into their own stream, it's easy to implement the generator as a spliterator:

/**
 * based on https://mcmap.net/q/57414/-how-to-implement-an-efficient-infinite-generator-of-prime-numbers-in-python
 */
static class PrimeSpliterator extends Spliterators.AbstractIntSpliterator {
    private static final int CHARACTERISTICS = Spliterator.DISTINCT | Spliterator.IMMUTABLE | Spliterator.NONNULL | Spliterator.ORDERED | Spliterator.SORTED;
    private final Map<Integer, Supplier<IntStream>> sieve = new HashMap<>();
    private final PrimitiveIterator.OfInt postponedSieve = primes().iterator();
    private int p, q, c = 9;
    private Supplier<IntStream> s;
    PrimeSpliterator() {
        super(105097564 /* according to Wolfram Alpha */ - 4 /* in prefix */,
                CHARACTERISTICS);
        //p = next(ps) and next(ps) (that's Pythonic?)
        postponedSieve.nextInt();
        this.p = postponedSieve.nextInt();
        this.q = p*p;
    }

    @Override
    public boolean tryAdvance(IntConsumer action) {
        for (; c > 0 /* overflow */; c += 2) {
            Supplier<IntStream> maybeS = sieve.remove(c);
            if (maybeS != null)
                s = maybeS;
            else if (c < q) {
                action.accept(c);
                return true; //continue
            } else {
                s = () -> IntStream.iterate(q+2*p, x -> x + 2*p);
                p = postponedSieve.nextInt();
                q = p*p;
            }
            int m = s.get().filter(x -> !sieve.containsKey(x)).findFirst().getAsInt();
            sieve.put(m, s);
        }
        return false;
    }
}

My first attempt at the primes() method returns an IntStream concatenating a constant stream with a new PrimeSpliterator:

public static IntStream primes() {
    return IntStream.concat(IntStream.of(2, 3, 5, 7),
            StreamSupport.intStream(new PrimeSpliterator()));
}

Calling primes() results in a StackOverflowError because primes() always instantiates a PrimeSpliterator, but PrimeSpliterator's field initializer always calls primes(). However, there's an overload of StreamSupport.intStream that takes a Supplier, which should allow lazily creating the PrimeSpliterator:

public static IntStream primes() {
    return IntStream.concat(IntStream.of(2, 3, 5, 7),
            StreamSupport.intStream(PrimeSpliterator::new, PrimeSpliterator.CHARACTERISTICS, false));
}

However, I instead get a StackOverflowError with a different backtrace (trimmed, as it repeats). Note that the recursion is entirely in the call to primes() -- the terminal operation iterator() is never invoked on a returned stream.

Exception in thread "main" java.lang.StackOverflowError
    at java.util.stream.StreamSpliterators$DelegatingSpliterator$OfInt.<init>(StreamSpliterators.java:582)
    at java.util.stream.IntPipeline.lazySpliterator(IntPipeline.java:155)
    at java.util.stream.IntPipeline$Head.lazySpliterator(IntPipeline.java:514)
    at java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:352)
    at java.util.stream.IntPipeline.spliterator(IntPipeline.java:181)
    at java.util.stream.IntStream.concat(IntStream.java:851)
    at com.jeffreybosboom.projecteuler.util.Primes.primes(Primes.java:22)
    at com.jeffreybosboom.projecteuler.util.Primes$PrimeSpliterator.<init>(Primes.java:32)
    at com.jeffreybosboom.projecteuler.util.Primes$$Lambda$1/834600351.get(Unknown Source)
    at java.util.stream.StreamSpliterators$DelegatingSpliterator.get(StreamSpliterators.java:513)
    at java.util.stream.StreamSpliterators$DelegatingSpliterator.estimateSize(StreamSpliterators.java:536)
    at java.util.stream.Streams$ConcatSpliterator.<init>(Streams.java:713)
    at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:789)
    at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:785)
    at java.util.stream.Streams$ConcatSpliterator$OfInt.<init>(Streams.java:819)
    at java.util.stream.IntStream.concat(IntStream.java:851)
    at com.jeffreybosboom.projecteuler.util.Primes.primes(Primes.java:22)
    at com.jeffreybosboom.projecteuler.util.Primes$PrimeSpliterator.<init>(Primes.java:32)
    at com.jeffreybosboom.projecteuler.util.Primes$$Lambda$1/834600351.get(Unknown Source)
    at java.util.stream.StreamSpliterators$DelegatingSpliterator.get(StreamSpliterators.java:513)
    at java.util.stream.StreamSpliterators$DelegatingSpliterator.estimateSize(StreamSpliterators.java:536)
    at java.util.stream.Streams$ConcatSpliterator.<init>(Streams.java:713)
    at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:789)
    at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:785)
    at java.util.stream.Streams$ConcatSpliterator$OfInt.<init>(Streams.java:819)
    at java.util.stream.IntStream.concat(IntStream.java:851)
    at com.jeffreybosboom.projecteuler.util.Primes.primes(Primes.java:22)

How can I concatenate streams lazily enough to allow a stream to use another copy of itself in its implementation?

Slit answered 6/7, 2015 at 19:26 Comment(2)
@the8472 It's advanced twice in the constructor, so I don't see how it could be lazily-initialized. (I think the question is still valid anyway, given how IntStream.concat is documented to be lazy.)Slit
It doesn't pertain to the lazy stream, but x -> x + 2*p is likely a bug since p is a member variable that might change before the lambda is evaluated.Ranson
A
10

Your apparently assume that the Streams API extends its guarantees of laziness even to the instantiation of spliterators; this is not correct. It expects to be able to instantiate the stream's spliterator at any time before the actual consumption begins, for example just to find out the stream's characteristics and reported size. Consumption only begins by invoking trySplit, tryAdvance, or forEachRemaining.

Having that in mind, you are initializing the postponed sieve earlier than you need it. You don't get to use any of its results until the else if part in tryAdvance. So move the code to the last possible moment which gives correctness:

@Override
public boolean tryAdvance(IntConsumer action) {
    for (; c > 0 /* overflow */; c += 2) {
        Supplier<IntStream> maybeS = sieve.remove(c);
        if (maybeS != null)
            s = maybeS;
        else {
            if (postponedSieve == null) {
              postponedSieve = primes().iterator();
              postponedSieve.nextInt();
              this.p = postponedSieve.nextInt();
              this.q = p*p;
            }
            if (c < q) {
              action.accept(c);
              return true; //continue

I think that, with this change, even your first attempt at primes() should work.

If you want to stay with your current approach, you could involve the following idiom:

Stream.<Supplier<IntStream>>of(
  ()->IntStream.of(2, 3, 5, 7),
  ()->intStream(new PrimeSpliterator()))
.flatMap(Supplier::get);

You may find that this gives you as much laziness as you need.

Alcmene answered 6/7, 2015 at 20:18 Comment(14)
I'm sorry, I don't understand. The Python code begins by advancing the second sieve twice: p = next(ps) and next(ps), which your suggestion omits. (Also, while this approach might solve my specific problem, it doesn't resolve my confusion about IntStream.concat claiming to be lazy, though maybe I should ask a separate question with no code about that.) Not being a big Python programmer, perhaps I'm just confused about what the source does, but there is a question in the comments there and the answer is "it only works because it's lazy".Slit
It doesn't omit it, it places it at a different place.Alcmene
Okay, I guess you edited it, that makes more sense. Still confused about stream laziness but I'll ask another question about it if it ever comes up again.Slit
No, this is the first and only version of code which I posted.Alcmene
Have you considered using the idiom Stream.of(stream1, stream2).flatMap(identity()) which is semantically equivalent to concat, but could be lazier?Alcmene
That's lazy enough, but now primes().limit(5).forEachOrdered(System.out::println); does not terminate (in a reasonable time) after printing the first five primes. I think it's a bug in my sieve implementation, as I haven't previously been able to test it. (I wouldn't have thought of that one, thanks for the suggestion.)Slit
The flatMap is not lazier in this case. Yes, unlike Stream.concat flatMap does not retrieve the stream spliterator until the traversal reaches given point. But in this case the spliterators for both streams already exist at the very start, thus it doesn't matter. Also the flatMap-ped stream does not report its size (unlike concat-enated stream), so 105097564 constant becomes useless.Farthest
@TagirValeev Flatmapping a stream of suppliers of streams should be a fix for that (as in my edited example). Propagating the reported size is tougher because it's a chicken-and-egg problem: you can't find out the reported size before you instantiate the spliterator.Alcmene
All told, for the specific problem I'd recommend the first solution proposed in my answer. On a more general note, Java streams just aren't designed to support OP's idiom because the instantiation of a spliterator is not considered consumption. Only calling tryAdvance/forEachRemaining counts as one, and OP's code violates that.Alcmene
Any flatMap based solution is doomed to fail. Besides that, the logic doesn’t seem to be fundamentally different to this much simpler solution and the raised complexity and low level operating doesn’t seem to provide a performance benefit either. I agree with Marko Topolnik that Java streams are not designed for this. However, if you still want to try to make them do that, you should strive for simplicity…Augment
@Augment Whichever variant was considered so far, I think that parallelizing it would fail. And, unfortunately for some, the design of Java streams is bent quite a bit towards the needs of parallelization. We could almost argue that everything else is a lucky extra, just like with defender methods.Alcmene
@Marko Topolnik: well, everyone who tries to find primes but doesn’t use a simple loop and a BitSet for ints or BigInteger.nextProbablePrime for larger values but the Stream API instead, seems to want to make an exercise in functional programming rather than finding a simple pragmatic solution. In that regard, the Stream API does everything right by throwing exactly these obstacles into the programmer’s way that (s)he seeks for.Augment
@Augment I appreciate the sarcasm :) Still, an FP library with a different focus would probably take more care to respect lazy semantics. OP could be better off with the Functional Java library if his goal is acquiring some practice with pure FP idioms.Alcmene
@Holger: I'd say a floating sieve (dictionary) logic is fundamentally different from a trial division. ;o)Corazoncorban
B
1

I like to use Supplier to do that:

return Stream.<Supplier<Stream<WhatEver>>of(
  () -> generateStreamOfWhatEverAndChangeSomeState(input, state),
  () -> generateStreamOfMoreWhatEversDependendingOnMutatedState(state)
).flatMap(Supplier::get);

Since stream is lazily evaluated the generateStreamOfWhatEverAndChangeSomeState() will finish before generateStreamOfMoreWhatEversDependendingOnMutatedState() start and the state would be updated.

I should note that this is probably not what the designers of Stream had in mind. Idealy a Stream should not change state, only read each item and produce a new item.

Bobettebobina answered 20/6, 2022 at 7:59 Comment(0)
G
0

@selalerercapitolis's answer was very helpful for my use case, which has nothing to do with mutation: to get elements from the concatenated stream until one of the elements doesn't have a certain property.

Example:

Stream<String> concatenated = Stream.of(supplierA, supplierB).flatMap(Supplier::get); // selalerercapitolis's answer.
List<String> result = concatenated.takeWhile(s -> s.length() > 5).toList();

supplierB generates a stream that is a bit expensive to create, so I was avoiding creating it if one of the elements generated by supplierA fails the condition tested by takeWhile(). That is, if result will only contain elements generated by supplierA.

Grisly answered 4/9, 2023 at 2:1 Comment(1)
Method takeWhile was added to interface java.util.stream.Stream in JDK 9 which came out four years after the original question was posted. I think you should mention that in your answer since (at time of writing this comment) many people are still using JDK 8 (which, of-course, doesn't have method takeWhile).Decant

© 2022 - 2024 — McMap. All rights reserved.