How to lazily evaluate nested flatMap
Asked Answered
S

1

14

I'm trying to conjure up a cartesian product from two potentially infinite streams that I then limit via limit().

This has been (approximately) my strategy so far:

@Test
void flatMapIsLazy() {
        Stream.of("a", "b", "c")
            .flatMap(s -> Stream.of("x", "y")
                .flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
                    .mapToObj(sd::repeat)))
            .map(s -> s + "u")
            .limit(20)
            .forEach(System.out::println);
}

This doesn't work.

Apparently, my second stream gets terminally-evaluated on the spot the first time it is used on the pipeline. It doesn't produce a lazy stream that I can then consume at my own pace.

I think the .forEach in this piece of code from ReferencePipeline#flatMap is to blame:

@Override
public void accept(P_OUT u) {
    try (Stream<? extends R> result = mapper.apply(u)) {
        if (result != null) {
            if (!cancellationRequestedCalled) {
               result.sequential().forEach(downstream);
            }
            else {
                var s = result.sequential().spliterator();
                do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
            }
        }
    }
}

I expected the above code to return 20 elements looking like:

a
ax
axx
axxx
axxxx
...
axxxxxxxxxxxxxxxxxxx

But instead it crashes with an OutOfMemoryError, since the very long Stream of the nested flatMap is evaluated eagerly (??) and fills up my memory with unnecessary copies of the repeated strings. If instead of Integer.MAX_VALUE, a value of 3 was provided, keeping the same limit at 20, expected output would instead be:

a
ax
axx
axxx
a
ay
ayy
ayyy
b
bx
bxx
bxxx
...
(up until 20 lines)

Edit: At this point I have just rolled my own implementation with lazy iterators. Still, I think there should be a way to do this with pure Streams.

Edit 2: This has been admitted as a bug ticket in Java https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8267758%20

Sandra answered 24/5, 2021 at 4:55 Comment(11)
Stream size aside, have you tried running just "x".repeat(Integer.MAX_VALUE) once? On my machine, I get an OOM. Maybe this is just a bad example you have here, but you can't expect it to work.Irreconcilable
Beside the above, .flatMap(s -> second) can't work. You're trying to reuse a stream. That will almost certainly give you an illegalstateexception.Irreconcilable
Hi @ernest_k, thank for your comments! If the stream was lazyly-evaluated, there shouldn't be an actual instance of a call to String#repeat() with Integer.MAX_VALUE as an argument. The repeated strings would generate one by one as needed. About Stream reuse, what stream instance do you see being reused? Oh, I get it, yeah. I'll update that one, thanks.Sandra
There. No stream reuse now. :)Sandra
I guess now you just have to try this version of the code.Irreconcilable
A version of code more true to your original query can be: Stream.of("a", "b", "c").flatMap(s -> Stream.of("x", "y").flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE).mapToObj(sd::repeat))).map(s -> s + "u").limit(20).forEach(System.out::println);, which causes OOM. Note it has nested flatMap calls.Irreconcilable
The code in the question is an executable example I wrote as a test and specifically to show the problem. The actual code is spread across various classes and it did not reuse streams, but the problem remains. Java Streams simply don't seem to support lazy evaluation in this way. Also, I always run all the code before uploading it here, of course. ;) I'm rolling my own lazy Iterator implementation now and it's doing wonders but I'd rather be able to use the Stream API instead.Sandra
I meant that the current code you have in the post does work (i.e., it lazily evaluates your nested stream); and that means your problem isn't showcased by your example. The code I've added 2 comments ago is an adaptation of your original code, showing where/how it fails... and the difference is that you have nested flatMap calls, which maybe is what you need answers about.Irreconcilable
@Irreconcilable Yup, that's it. I changed the question code! Thanks! :)Sandra
I'm assuming you would supply some value (like an integer to the limit method) to govern the size of the output. It would be useful to see expected output for several such values.Salsbury
Yes, that's the idea. I will update the question with input-output examples.Sandra
C
5

As you have already written, this has been accepted as a bug. Maybe, it will be solved in a future version of Java.

But there could be a solution even now. It is not very elegant and it may be viable only if the number of elements in the outer stream and the limit are small enough. But it will work under these restrictions.

Let me first modify your example a little bit by converting the outer flatMap into two operations (a map and a flatMap with identity, doing only flatten):

Stream.of("a", "b", "c")
      .map(s -> Stream.of("x", "y")
            .flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
                  .mapToObj(sd::repeat)))
      .flatMap(s -> s)
      .map(s -> s + "u")
      .limit(20)
      .forEach(System.out::println);

We can easily see that we need no more than 20 elements from each inner stream. So we can limit each stream to this number of elements. This will work (you should use a varaible or constant for the limit):

Stream.of("a", "b", "c")
      .map(s -> Stream.of("x", "y")
            .flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
                  .mapToObj(sd::repeat)))
      .flatMap(s -> s.limit(20))            // limit each inner stream
      .map(s -> s + "u")
      .limit(20)
      .forEach(System.out::println);

Of course this will still produce too much intermediate results, but it may not be a big problem under the above restrictions.

Confined answered 6/6, 2021 at 12:35 Comment(1)
That's nice! In my real test code it would not work because I flatMap and limit at very different places and stack depths because the inner and outer streams are generated in different classes. Probably I could make the limit travel up and down through calls but kind of defeats the purpose. Anyway, thank you for your time and your answer.Sandra

© 2022 - 2024 — McMap. All rights reserved.