Why filter() after flatMap() is "not completely" lazy in Java streams?
Asked Answered
P

8

82

I have the following sample code:

System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);
System.out.println("-----------");
System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);

The output is as follows:

1
Result: 1
-----------
-1
0
1
0
1
2
1
2
3
Result: -1

From here I see that in first case stream really behaves lazily - we use findFirst() so once we have first element our filtering lambda is not invoked. However, in second case which uses flatMaps we see that despite first element which fulfils the filter condition is found (it's just any first element as lambda always returns true) further contents of the stream are still being fed through filtering function.

I am trying to understand why it behaves like this rather than giving up after first element is calculated as in the first case. Any helpful information would be appreciated.

Phoenicia answered 24/3, 2015 at 9:46 Comment(10)
I really don't get your problem. why should it behave differnt?Anion
@PhilippSander: Because if it behaved lazily - as it does in the first case - it would only evaluate the filter once.Rate
ah.... it clicked when I read markos answerAnion
Note that you could also use peek : Stream.of(1, 2, 3).peek(System.out::println).filter(i -> true)...Agustinaah
@Asieh: when editing, please note that inline code spans (like this) shouldn't be used for highlighting, only for code in sentences. Also, please try and improve the post as much as possible when editing to save the reviewers time. Thanks!Bash
Note that I created a general workaroundFotheringhay
An OpenJDK bug was raised for this on the day this question was asked: bugs.openjdk.java.net/browse/JDK-8075939 . It has been assigned, but still not fixed, almost a year later :(Nipping
@Nipping JDK-8075939 is targeted for Java 10. Cf. mail.openjdk.java.net/pipermail/core-libs-dev/2017-December/… for the core-libs-dev review thread and a link to the first webrev.Coh
Update: this issue was fixed in Java 10 and backported to Java 8u66. You should no longer encounter this issue - if you do, you should upgrade to the latest minor version of Java.Nipping
@Nipping It was backported to Java 8u222, not 8u66.Malleus
F
71

TL;DR, this has been addressed in JDK-8075939 and fixed in Java 10 (and backported to Java 8 in JDK-8225328).

When looking into the implementation (ReferencePipeline.java) we see the method [link]

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}

which will be invoke for findFirst operation. The special thing to take care about is the sink.cancellationRequested() which allows to end the loop on the first match. Compare to [link]

@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
    Objects.requireNonNull(mapper);
    // We can do better than this, by polling cancellationRequested when stream is infinite
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    try (Stream<? extends R> result = mapper.apply(u)) {
                        // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
                        if (result != null)
                            result.sequential().forEach(downstream);
                    }
                }
            };
        }
    };
}

The method for advancing one item ends up calling forEach on the sub-stream without any possibility for earlier termination and the comment at the beginning of the flatMap method even tells about this absent feature.

Since this is more than just an optimization thing as it implies that the code simply breaks when the sub-stream is infinite, I hope that the developers soon prove that they “can do better than this”…


To illustrate the implications, while Stream.iterate(0, i->i+1).findFirst() works as expected, Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst() will end up in an infinite loop.

Regarding the specification, most of it can be found in the

chapter “Stream operations and pipelines” of the package specification:

Intermediate operations return a new stream. They are always lazy;

… Laziness also allows avoiding examining all the data when it is not necessary; for operations such as "find the first string longer than 1000 characters", it is only necessary to examine just enough strings to find one that has the desired characteristics without examining all of the strings available from the source. (This behavior becomes even more important when the input stream is infinite and not merely large.)

Further, some operations are deemed short-circuiting operations. An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result. A terminal operation is short-circuiting if, when presented with infinite input, it may terminate in finite time. Having a short-circuiting operation in the pipeline is a necessary, but not sufficient, condition for the processing of an infinite stream to terminate normally in finite time.

It’s clear that a short-circuiting operation doesn’t guaranty a finite time termination, e.g. when a filter doesn’t match any item the processing can’t complete, but an implementation which doesn’t support any termination in finite time by simply ignoring the short-circuiting nature of an operation is far off the specification.

Fotheringhay answered 24/3, 2015 at 11:4 Comment(12)
This a bug. While it might be true that the specification supports this behavior, no one expects that getting the first element of an infinite stream will throw a StackOverflowError or will end up in an infinite loop, no matter if it comes directly from the source of the pipeline or from a nested stream via a mapping function. This should be reported as a bug.Preamble
This is an interesting analysis of the code itself. But do you have any thoughts on whether this should be expected behaviour based on the API guarantees? I found your comment on the other answer interesting and was hoping your answer would contain a more general discussion too.Lainelainey
Strange that there is not much around the internet with this regard. Don't believe nobody spotted this before me. I was trying to resolve combinatorial problem for which I hoped not to calculate all exploration space by using lazy stream behaviour. But it appears to be not possible because of this. Should I file this as a bug? I will try to solve it in Scala then.Phoenicia
@Vadym S. Khondar: filing a bug report is a good idea. Regarding why didn’t someone spot this before, I have seen a lot of “can’t believe I’m the first one to notice this” kind of bugs before. Unless infinite streams are involved, this bug has only performance impacts which might stay unnoticed in a lot of use cases.Fotheringhay
Intermediate operations return a new stream. They are always lazy;---this continues with Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed. This property has not been violated. Laziness also allows avoiding examining... This doesn't specify any further guarantees, it is just an explanation. An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result. ---again, not violated.Coorg
@Marko Topolnik: the property “does not begin until the terminal operation of the pipeline is executed” does not negate other properties of lazy operations. I know that there is no single-sentence declaration of the discussed property, otherwise I cited it. Within the Stream API doc it is said that “Streams are lazy; computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed.”Fotheringhay
You may question again wither this implies a lazy execution guaranty regarding short-circuiting, however, I tend to see it the other way round: at no point it is said that implementations are free to implement non-lazy behavior the way we see here. And the specification is very exhaustive regarding what is permissible and what not.Fotheringhay
Arguably, "source elements" here are what OP supplied: 1, 2, 3. Nothing is explicitly said either way about the nested streams. Specifically, nothing is said about what would constitute a terminal operation for the nested stream. The passing of a nested stream to filter may constitute a terminal operation for it. Note that I am with you on all this being counterintuitive; I just have enough experience with Streams that I've learned what to expect from them, and here I actually expected eager consumption of the nested stream (or at least wasn't at all surprised to see it).Coorg
@Fotheringhay almost two years after and I feel like a total idiot not seeing this; do you know if this has been reported as a bug or not? Cause I can still reproduce it in jdk-9...Giddy
@Eugene: there seems to be no activity in JDK-8075939 at the moment. Note this workaroundFotheringhay
JDK-8075939 makes progress now. See mail.openjdk.java.net/pipermail/core-libs-dev/2017-December/… for the core-libs-dev review thread and a link to the first webrev. It apppears we'll see it in Java 10.Coh
@StefanZobel Looks like it has been backported to Java 8 too.Malleus
C
18

The elements of the input stream are consumed lazily one by one. The first element, 1, is transformed by the two flatMaps into the stream -1, 0, 1, 0, 1, 2, 1, 2, 3, so that entire stream corresponds to just the first input element. The nested streams are eagerly materialized by the pipeline, then flattened, then fed to the filter stage. This explains your output.

The above does not stem from a fundamental limitation, but it would probably make things much more complicated to get full-blown laziness for nested streams. I suspect it would be an even greater challenge to make it performant.

For comparison, Clojure's lazy seqs get another layer of wrapping for each such level of nesting. Due to this design, the operations may even fail with StackOverflowError when nesting is exercised to the extreme.

Coorg answered 24/3, 2015 at 9:55 Comment(2)
@MarkoTopolnik, thanks for your reply. Actually, the concern brought by Holger is actually reason of my surprise. Does the second case mean that I can't use flatMap for infinite streams?Phoenicia
Yes, I bet that the nested stream cannot be infinite.Coorg
P
8

With regard to breakage with infinite sub-streams, the behavior of flatMap becomes still more surprising when one throws in an intermediate (as opposed to terminal) short-circuiting operation.

While the following works as expected, printing out the infinite sequence of integers

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).forEach(System.out::println);

the following code prints out only the "1", but still does not terminate:

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).limit(1).forEach(System.out::println);

I cannot imagine a reading of the spec in which that were not a bug.

Parthia answered 1/5, 2015 at 11:28 Comment(0)
C
6

In my free StreamEx library I introduced the short-circuiting collectors. When collecting sequential stream with short-circuiting collector (like MoreCollectors.first()) exactly one element is consumed from the source. Internally it's implemented in quite dirty way: using a custom exception to break the control flow. Using my library your sample could be rewritten in this way:

System.out.println(
        "Result: " +
                StreamEx.of(1, 2, 3)
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .collect(MoreCollectors.first())
                .get()
        );

The result is the following:

-1
Result: -1
Canossa answered 21/11, 2015 at 3:58 Comment(0)
V
2

While JDK-8075939 has been fixed in Java 11 and backported to 10 and 8u222, there's still an edge case of flatMap() not being truly lazy when using Stream.iterator(): JDK-8267359, still present in Java 17.

This

Iterator<Integer> it =
    Stream.of("a", "b")
        .flatMap(s -> Stream
            .of(1, 2, 3, 4)
            .filter(i -> { System.out.println(i); return true; }))
        .iterator();

it.hasNext(); // This consumes the entire flatmapped stream
it.next();

Prints

1
2
3
4

While this:

Iterator<Integer> it =
    Stream.of("a", "b")
        .flatMap(s -> Stream
            .iterate(1, i -> i)
            .filter(i -> { System.out.println(i); return true; }))
        .iterator();

it.hasNext();
it.next();

Never terminates

Ventilation answered 19/5, 2021 at 7:8 Comment(0)
F
0

I agree with other people this is a bug opened at JDK-8075939. And since it's still not fixed more than one year later. I would like to recommend you: abacus-common

N.println("Result: " + Stream.of(1, 2, 3).peek(N::println).first().get());

N.println("-----------");

N.println("Result: " + Stream.of(1, 2, 3)
                        .flatMap(i -> Stream.of(i - 1, i, i + 1))
                        .flatMap(i -> Stream.of(i - 1, i, i + 1))
                        .peek(N::println).first().get());

// output:
// 1
// Result: 1
// -----------
// -1
// Result: -1

Disclosure: I'm the developer of abacus-common.

Farseeing answered 7/12, 2016 at 19:17 Comment(0)
C
0

Unfortunately .flatMap() is not lazy. However, a custom flatMap workaround is available here: Why .flatMap() is so inefficient (non lazy) in java 8 and java 9

Cornetist answered 10/10, 2017 at 20:46 Comment(0)
M
0

Today I also stumbled up on this bug. Behavior is not so strait forward, cause simple case, like below, is working fine, but similar production code doesn't work.

 stream(spliterator).map(o -> o).flatMap(Stream::of).flatMap(Stream::of).findAny()

For guys who cannot wait another couple years for migration to JDK-10 there is a alternative true lazy stream. It doesn't support parallel. It was dedicated for JavaScript translation, but it worked out for me, cause interface is the same.

StreamHelper is collection based, but it is easy to adapt Spliterator.

https://github.com/yaitskov/j4ts/blob/stream/src/main/java/javaemul/internal/stream/StreamHelper.java

Mindoro answered 7/11, 2019 at 22:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.