We had a similar problem to solve. We wanted to take a stream that was larger than system memory (iterating through all objects in a database) and randomise the order as best as possible - we thought it would be ok to buffer 10,000 items and randomise them.
The target was a function which took in a stream.
Of the solutions proposed here, there seem to be a range of options:
- Use various non-java 8 additional libraries
- Start with something that's not a stream - e.g. a random access list
- Have a stream which can be split easily in a spliterator
Our instinct was originally to use a custom collector, but this meant dropping out of streaming. The custom collector solution above is very good and we nearly used it.
Here's a solution which cheats by using the fact that Stream
s can give you an Iterator
which you can use as an escape hatch to let you do something extra that streams don't support. The Iterator
is converted back to a stream using another bit of Java 8 StreamSupport
sorcery.
/**
* An iterator which returns batches of items taken from another iterator
*/
public class BatchingIterator<T> implements Iterator<List<T>> {
/**
* Given a stream, convert it to a stream of batches no greater than the
* batchSize.
* @param originalStream to convert
* @param batchSize maximum size of a batch
* @param <T> type of items in the stream
* @return a stream of batches taken sequentially from the original stream
*/
public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
}
private static <T> Stream<T> asStream(Iterator<T> iterator) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator,ORDERED),
false);
}
private int batchSize;
private List<T> currentBatch;
private Iterator<T> sourceIterator;
public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
this.batchSize = batchSize;
this.sourceIterator = sourceIterator;
}
@Override
public boolean hasNext() {
prepareNextBatch();
return currentBatch!=null && !currentBatch.isEmpty();
}
@Override
public List<T> next() {
return currentBatch;
}
private void prepareNextBatch() {
currentBatch = new ArrayList<>(batchSize);
while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
currentBatch.add(sourceIterator.next());
}
}
}
A simple example of using this would look like this:
@Test
public void getsBatches() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
.forEach(System.out::println);
}
The above prints
[A, B, C]
[D, E, F]
For our use case, we wanted to shuffle the batches and then keep them as a stream - it looked like this:
@Test
public void howScramblingCouldBeDone() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
// the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
.map(list -> {
Collections.shuffle(list); return list; })
.flatMap(List::stream)
.forEach(System.out::println);
}
This outputs something like (it's randomised, so different every time)
A
C
B
E
D
F
The secret sauce here is that there's always a stream, so you can either operate on a stream of batches, or do something to each batch and then flatMap
it back to a stream. Even better, all of the above only runs as the final forEach
or collect
or other terminating expressions PULL the data through the stream.
It turns out that iterator
is a special type of terminating operation on a stream and does not cause the whole stream to run and come into memory! Thanks to the Java 8 guys for a brilliant design!
flatMap
(+ an additional flatMap to collapse the streams again)? I don't think something like that exists as a convenient method in the standard library. Either you'll have to find a 3rd party lib or write your own based on spliterators and/or a collector emitting a stream of streams – LevanterStream.generate
withreader::readLine
andlimit
, but the problem is that streams don't go well with Exceptions. Also, this is probably not parallelizable well. I think thefor
loop is still the best option. – Monstrous