How to implement a Java stream?
Asked Answered
B

6

45

I want to implement a Stream<T>.

I don't want to just use implements Stream<T>, because I would have to implement a ton of methods.

Can this be avoided?

To be more concrete, how can I stream t1, t2 and t3 for example:

class Foo<T> {
    T t1, t2, t3;

    Foo(T t1, T t2, T t3) {
        this.t1 = t1;
        this.t2 = t2;
        this.t3 = t3;
    }
}
Biggers answered 6/6, 2015 at 17:27 Comment(2)
Stream.of(t1, t2, t3).Cadena
Stream.builderDopp
M
56

The JDK's standard implementation of Stream is the internal class java.util.stream.ReferencePipeline, you cannot instantiate it directly.

Instead you can use java.util.stream.Stream.builder(), java.util.stream.StreamSupport.stream(Spliterator<T>, boolean) and various1, 2 other static factory methods to create an instance of the default implementation.

Using a spliterator is probably the most powerful approach as it allows you to provide objects lazily while also enabling efficient parallelization if your source can be divided into multiple chunks.

Additionally you can also convert streams back into spliterators, wrap them in a custom spliterator and then convert them back into a stream if you need to implement your own stateful intermediate operations - e.g. due to shortcomings in the standard APIs - since most available intermediate ops are not allowed to be stateful.
See this SO answer for an example.

In principle you could write your own implementation of the stream interface, but that would be quite tedious.

Maenad answered 6/6, 2015 at 17:42 Comment(1)
But Spliterator isn't closable and my data source is :(Pronounce
T
23

If you're wanting to make your own Stream because you want custom close() logic, the simplest solution is to create a Stream from an Iterator, and call onClose(Runnable). For instance, to stream from a Reader via Jackson:

MappingIterator<?> values = objectMapper.reader(type).readValues(reader);
return StreamSupport
        .stream(Spliterators.spliteratorUnknownSize(values, Spliterator.ORDERED), false)
        .onClose(() -> {
            try {
                reader.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
Tandy answered 24/5, 2016 at 17:34 Comment(0)
Z
18

You usually do not need to write your own stream class. Instead you can create stream by existing methods. For instance, here is how to create a stream of the value 1, 100:

  AtomicInteger n = new AtomicInteger(0);
  Stream<Integer> stream = Stream.generate(() -> n.incrementAndGet()).limit(100);

so in here we created an infinite stream of integers: 1, 2, 3, .... then we used limit(100) on that infinite stream to get back a stream of 100 elements.

For clarity, if you want a stream of integers (at fixed intervals) you should use IntStream.range(). This is just an example to show how streams can be defined using Stream.generate() which gives you more flexibility as it allows you to use arbitrary logic for determining steam's elements.

Zobias answered 6/6, 2015 at 17:52 Comment(3)
IntStream.range(0,100) would be more efficient in this case.Maenad
This can be shortened to Stream.generate(n::incrementAndGet).limit(100)Helprin
Use n::getAndIncrement if you want the stream to start at 0.Helprin
C
12

Others have answered how to provide a general-purpose Stream implementation. Regarding your specific requirement, just do this:

class Foo<T> {

    T t1, t2, t3;

    Foo(T t1, T t2, T t3) {
        this.t1 = t1;
        this.t2 = t2;
        this.t3 = t3;
    }

    Stream<T> stream() {
        return Stream.of(t1, t2, t3);
    }
}
Calamint answered 8/12, 2015 at 20:59 Comment(1)
This answer is both concise and idiomatic (at least, it mimics the stream methods in the std lib). So I'd say it should be higher up.Boneblack
L
11

For the sake of completeness, as I did not find this directly among the answers here on SO: If you want to transform an existing Iterator into a Stream (e.g., because you want to generate elements successively), use this:

StreamSupport.stream(
    Spliterators.spliterator(myIterator, /* initial size*/ 0L, Spliterator.NONNULL), 
    /* not parallel */ false);

I found this a bit hard to find, as you need to know StreamSupport, Spliterators and Spliterator

Libbylibeccio answered 20/7, 2017 at 14:50 Comment(2)
This is useful, thanks! Note that the parameter you've called "initial size" is actually the stream's estimated size. Not exactly sure what that is used for but it seems to play a role in whether a stream is parallelisable and I think that passing 0 here rules out concurrency entirely. Spliterators.spliteratorUnknownSize(myIterator, Spliterator.NONNULL) would use Long.MAX_VALUE as the estimate instead.Enzymology
@Enzymology As the docs say: "The spliterator is only traversed, split, or queried for estimated size after the terminal operation of the stream pipeline commences". So if spliterator's underlying iterator contains more elements than size value, they won't be acquired/get/collected.Olds
S
0

While this is not a good solution for your example with finitely many values, as this question comes up in searches for how to implement a stream, it’s perhaps worthwhile to mention a simple way to implement an infinite stream, Stream.generate(Supplier), for example:

    return Stream.generate(() -> Math.random());
Schwinn answered 12/2 at 18:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.