Is there an elegant way to process a stream in chunks?
Asked Answered
E

15

89

My exact scenario is inserting data to database in batches, so I want to accumulate DOM objects then every 1000, flush them.

I implemented it by putting code in the accumulator to detect fullness then flush, but that seems wrong - the flush control should come from the caller.

I could convert the stream to a List then use subList in an iterative fashion, but that too seems clunky.

It there a neat way to take action every n elements then continue with the stream while only processing the stream once?

Edam answered 20/12, 2014 at 19:33 Comment(1)
For a similar use case I did this: bitbucket.org/assylias/bigblue-utils/src/… - not exactly what you are asking for though.Puissant
W
37

Elegance is in the eye of the beholder. If you don't mind using a stateful function in groupingBy, you can do this:

AtomicInteger counter = new AtomicInteger();

stream.collect(groupingBy(x->counter.getAndIncrement()/chunkSize))
    .values()
    .forEach(database::flushChunk);

This doesn't win any performance or memory usage points over your original solution because it will still materialize the entire stream before doing anything.

If you want to avoid materializing the list, stream API will not help you. You will have to get the stream's iterator or spliterator and do something like this:

Spliterator<Integer> split = stream.spliterator();
int chunkSize = 1000;

while(true) {
    List<Integer> chunk = new ArrayList<>(size);
    for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){};
    if (chunk.isEmpty()) break;
    database.flushChunk(chunk);
}
Walrath answered 22/12, 2014 at 2:15 Comment(1)
Nice spliterator solution! The variant with collect/groupingBy is terminating the stream and this is not a good option for large streams.Dye
T
33

Most of answers above do not use stream benefits like saving your memory. You can try to use iterator to resolve the problem

Stream<List<T>> chunk(Stream<T> stream, int size) {
  Iterator<T> iterator = stream.iterator();
  Iterator<List<T>> listIterator = new Iterator<>() {

    public boolean hasNext() {
      return iterator.hasNext();
    }

    public List<T> next() {
      List<T> result = new ArrayList<>(size);
      for (int i = 0; i < size && iterator.hasNext(); i++) {
        result.add(iterator.next());
      }
      return result;
    }
  };
  return StreamSupport.stream(((Iterable<List<T>>) () -> listIterator).spliterator(), false);
}
Tittup answered 3/12, 2019 at 19:34 Comment(3)
Very nice solution, +1. Just one improvement: you might want to return the stream as return StreamSupport.stream(Spliterators.spliteratorUnknownSize(listIterator, Spliterator.ORDERED), false);.Scarfskin
@PeterWalser would you mind to elaborate what does your suggestion do? Something to do with maintaining the chunked parts in order?Kobylak
a bit late, but i think he is avoiding the cast of the supplier iterator, with Spliterators you can just pass the iterator without the need to do a castHartsell
O
23

If you have guava dependency on your project you could do this:

StreamSupport.stream(Iterables.partition(simpleList, 1000).spliterator(), false).forEach(...);

See https://google.github.io/guava/releases/23.0/api/docs/com/google/common/collect/Lists.html#partition-java.util.List-int-

Update to use Iterators on Stream.iterator(). This will terminate but NOT consume the Stream in creating the grouping Iterator. This could be converted back to a Stream if needed.

Iterator<List<T>> listIterator = Iterators.partition(stream.iterator(), desiredSize);
Stream<List<T>> listStream = StreamSupport.stream(
  Spliterators.spliteratorUnknownSize(listIterator, 
  Spliterator.ORDERED), false);

https://guava.dev/releases/17.0/api/docs/com/google/common/collect/Iterators.html#partition(java.util.Iterator,%20int)

Oribel answered 16/5, 2018 at 9:55 Comment(3)
This solution splits a list and not a stream. Useful but not what @Edam asked.Carat
@Carat but you can make a stream with iterable (baeldung.com/java-iterable-to-stream).Aemia
Seems that this would be more appropriate to use guava.dev/releases/17.0/api/docs/com/google/common/collect/… Iterators.partition(stream.iterator(), size). This is non-consuming till the resulting Iteartor is consumed and could be converted back to a Stream if needed.Gouveia
S
16

You can create a stream of chunks (List<T>) of a stream of items and a given chunk size by

  • grouping the items by the chunk index (element index / chunk size)
  • ordering the chunks by their index
  • reducing the map to their ordered elements only

Code:

public static <T> Stream<List<T>> chunked(Stream<T> stream, int chunkSize) {
    AtomicInteger index = new AtomicInteger(0);

    return stream.collect(Collectors.groupingBy(x -> index.getAndIncrement() / chunkSize))
            .entrySet().stream()
            .sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue);
}

Example usage:

Stream<Integer> stream = IntStream.range(0, 100).mapToObj(Integer::valueOf);
Stream<List<Integer>> chunked = chunked(stream, 8);
chunked.forEach(chunk -> System.out.println("Chunk: " + chunk));

Output:

Chunk: [0, 1, 2, 3, 4, 5, 6, 7]
Chunk: [8, 9, 10, 11, 12, 13, 14, 15]
Chunk: [16, 17, 18, 19, 20, 21, 22, 23]
Chunk: [24, 25, 26, 27, 28, 29, 30, 31]
Chunk: [32, 33, 34, 35, 36, 37, 38, 39]
Chunk: [40, 41, 42, 43, 44, 45, 46, 47]
Chunk: [48, 49, 50, 51, 52, 53, 54, 55]
Chunk: [56, 57, 58, 59, 60, 61, 62, 63]
Chunk: [64, 65, 66, 67, 68, 69, 70, 71]
Chunk: [72, 73, 74, 75, 76, 77, 78, 79]
Chunk: [80, 81, 82, 83, 84, 85, 86, 87]
Chunk: [88, 89, 90, 91, 92, 93, 94, 95]
Chunk: [96, 97, 98, 99]
Scarfskin answered 17/5, 2018 at 7:42 Comment(4)
Thanks, I used your solution. I removed the sort not needed in my case.Velutinous
Very nice solutionAbsenteeism
This solution will read the complete stream into a map before processing the chunks, rather than producing chunks "mid-stream". This might not be what you'd want/expect, especially for large streams which are probably the biggest use case for chunked processing.Dekeles
@MarkusRohlof yes, you're absolutely right. I just tried to come up with a solution for larger (and potentially infinite) streams, only to find out it looks precisely the same as the one suggested by dmitryvim, so I really can recommend his solution.Scarfskin
G
9

Using library StreamEx solution would look like

Stream<Integer> stream = IntStream.iterate(0, i -> i + 1).boxed().limit(15);
AtomicInteger counter = new AtomicInteger(0);
int chunkSize = 4;

StreamEx.of(stream)
        .groupRuns((prev, next) -> counter.incrementAndGet() % chunkSize != 0)
        .forEach(chunk -> System.out.println(chunk));

Output:

[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9, 10, 11]
[12, 13, 14]

groupRuns accepts predicate that decides whether 2 elements should be in the same group.

It produces a group as soon as it finds first element that does not belong to it.

Grievous answered 25/7, 2016 at 17:58 Comment(7)
This doesn't work for a single record. For example, an integer stream of simply [1] would fail.Gooey
Stream of single item does work for me. What kind of error do you see? Could you post code that you tried?Grievous
The counter returns an incorrect value in the case there is one record.Gooey
I believe in the case of one record the groupRuns() is never called as it expects two entries. Is there a solution if the stream only returns one result? The incrementAndGet on the counter in your example is never hit and returns 0 if the chunk size is 1.Gooey
Grouping with chunk size 1 produces stream of lists of size 1. I believe it is expected behavior. Can you explain what do you try to achieve and what problem do you have? Probably with new stack overflow question - it is hard to share code in commentsGrievous
Raised: #45650490Gooey
This is nice, but only if the original stream is sequential, which I guess is the need of the original OP. Unfortunately, in my case I have a parallel stream source and nothing works, % chunkSize != 0 creates chunkSize chunks of average size total/chunkSize.Knossos
W
6

Here is simple wrapping spliterator implementation that groups source elements into chunks:

public class ChunkedSpliterator<T> implements Spliterator<List<T>> {
    private static final int PROMOTED_CHARACTERISTICS = Spliterator.ORDERED | Spliterator.DISTINCT | Spliterator.SIZED | Spliterator.IMMUTABLE | Spliterator.CONCURRENT;
    private static final int SELF_CHARACTERISTICS = Spliterator.NONNULL;

    private final Spliterator<T> src;
    private final int chunkSize;

    public ChunkedSpliterator(Spliterator<T> src, int chunkSize) {
        if (chunkSize < 1)
            throw new IllegalArgumentException("chunkSize must be at least 1");
        this.src = src;
        this.chunkSize = chunkSize;
    }

    public static <E> Stream<List<E>> chunkify(Stream<E> src, int chunkSize) {
        ChunkedSpliterator<E> wrap = new ChunkedSpliterator<>(src.spliterator(), chunkSize);
        return StreamSupport.stream(wrap, src.isParallel());
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<T>> action) {
        List<T> result = new ArrayList<>((int) Math.min(src.estimateSize(), chunkSize));
        for (int i = 0; i < chunkSize; ++i) {
            if (!src.tryAdvance(result::add))
                break;
        }
        if (result.isEmpty())
            return false;
        action.accept(result);
        return true;
    }

    @Override
    public Spliterator<List<T>> trySplit() {
        Spliterator<T> srcSplit = src.trySplit();
        return srcSplit == null ? null : new ChunkedSpliterator<>(srcSplit, chunkSize);
    }

    @Override
    public long estimateSize() {
        long srcSize = src.estimateSize();
        if (srcSize <= 0L) return 0L;
        if (srcSize == Long.MAX_VALUE) return Long.MAX_VALUE;
        return (srcSize - 1) / chunkSize + 1;
    }

    @Override
    public int characteristics() {
        return (src.characteristics() & PROMOTED_CHARACTERISTICS) | SELF_CHARACTERISTICS;
    }
}

There is handy chunkify shortcut method to make things easier:

    Stream<T> input = ...;
    Stream<List<T>> chunked = ChunkedSpliterator.chunkify(input, 1000);

Despite the call Stream.spliterator() is terminal operation it actually does not forcibly exhaust the stream's source. So, it can be processed via its spliterator gradually, without fetching all the data in memory - only per chunk.

This spliterator preserves most of input's characteristics. However, it's not sub-sized (chunks may be split in middle), not sorted (not obvious how to sort chunks even if elements are sortable) and produce only non-null chunks (albeit chunks still may have null elements). I'm not 100% sure about concurrent/immutable, but it seems it should inherit these with no problem. Also, produced chunks may be not strictly of requested size, but never exceed it.

In fact, I'm very surprised such a popular question had no answer introducing custom spliterator for almost 7 (!) years.

Widdershins answered 24/8, 2021 at 20:21 Comment(5)
Is there a reason tou went with estimateSize() -> (srcSize-1)/batchSize+1 , instead of maintaining a sequenceCounter and have (srcSize/batchSize)-sequenceCounter ?Sarsaparilla
Using any internal sequence counter might be bad idea, because accuracy of this spliterator's estimateSize depends on accuracy of the delegate and that might vary while it is being consumed. The delegate's implementation might return less accurate results at its start and more accurate closer to end. As estimateSize should reflect most accurate current state it's better to rely on delegate's size every time.Widdershins
But (srcSize-1)/batchSize+1 would always give you inaccurate value. batchSize of 5, srcSize of 100. For first invocation of forEachRemaining() it should give you 20 but with the above calculation it would give you 16. With internal seq counter , i only see an issue if we parallelise the stream but that can be mitigated with shared seq counter.Sarsaparilla
(100 - 1) / 5 + 1 = 99 / 5 + 1 = 19 + 1 = 20. What's wrong?Widdershins
This should be the accepted answer.Mansized
S
4

Look's like no, cause creating chunks means reducing stream, and reduce means termination. If you need to maintain stream nature and process chunks without collecting all data before here is my code (does not work for parallel streams):

private static <T> BinaryOperator<List<T>> processChunks(Consumer<List<T>> consumer, int chunkSize) {
    return (data, element) -> {
        if (data.size() < chunkSize) {
            data.addAll(element);
            return data;
        } else {
            consumer.accept(data);
            return element; // in fact it's new data list
        }
    };
}

private static <T> Function<T, List<T>> createList(int chunkSize) {
    AtomicInteger limiter = new AtomicInteger(0);
    return element -> {
        limiter.incrementAndGet();
        if (limiter.get() == 1) {
            ArrayList<T> list = new ArrayList<>(chunkSize);
            list.add(element);
            return list;
        } else if (limiter.get() == chunkSize) {
            limiter.set(0);
        }
        return Collections.singletonList(element);
    };
}

and how to use

Consumer<List<Integer>> chunkProcessor = (list) -> list.forEach(System.out::println);

    int chunkSize = 3;

    Stream.generate(StrTokenizer::getInt).limit(13)
            .map(createList(chunkSize))
            .reduce(processChunks(chunkProcessor, chunkSize))
            .ifPresent(chunkProcessor);

static Integer i = 0;

static Integer getInt()
{
    System.out.println("next");
    return i++;
}

it will print

next next next next 0 1 2 next next next 3 4 5 next next next 6 7 8 next next next 9 10 11 12

the idea behind is to create lists in a map operation with 'pattern'

[1,,],[2],[3],[4,,]...

and merge (+process) that with reduce.

[1,2,3],[4,5,6],...

and don't forget to process the last 'trimmed' chunk with

.ifPresent(chunkProcessor);
Sills answered 28/8, 2019 at 15:23 Comment(0)
B
4

The JEP 461: Stream Gatherers Java 22 preview language feature adds built-in support for partitioning a stream in groups of a given size:

// [[0, 1, 2], [3, 4, 5], [6]]
Stream<List<Integer>> stream =
        Stream.of(0, 1, 2, 3, 4, 5, 6).gather(Gatherers.windowFixed(3));

This uses the new Stream.gather method with the new built-in Gatherers.windowFixed gatherer to convert the initial Stream<T> to a Stream<List<T>>.


For your specific scenario, this would probably end up looking something like:

domStream.gather(Gatherers.windowFixed(1000)).forEach(batch -> {
    batch.forEach({
        insertElements(batch);
        flush();
    });
});

Javadocs

Gatherer:

An intermediate operation that transforms a stream of input elements into a stream of output elements, optionally applying a final action when the end of the upstream is reached. […]

[…]

There are many examples of gathering operations, including but not limited to: grouping elements into batches (windowing functions); de-duplicating consecutively similar elements; incremental accumulation functions (prefix scan); incremental reordering functions, etc. The class Gatherers provides implementations of common gathering operations.

Stream.gather:

Returns a stream consisting of the results of applying the given gatherer to the elements of this stream.

Gatherers.windowFixed

Returns a Gatherer that gathers elements into windows -- encounter-ordered groups of elements -- of a fixed size. If the stream is empty then no window will be produced. The last window may contain fewer elements than the supplied window size.

Example:

// will contain: [[1, 2, 3], [4, 5, 6], [7, 8]]
List<List<Integer>> windows =
    Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowFixed(3)).toList();
Byyourleave answered 8/1 at 16:59 Comment(0)
S
3

As Misha rightfully said, Elegance is in the eye of the beholder. I personally think an elegant solution would be to let the class that inserts to the database do this task. Similar to a BufferedWriter. This way it does not depend on your original data structure and can be used even with multiple streams after one and another. I am not sure if this is exactly what you mean by having the code in the accumulator which you thought is wrong. I don't think it is wrong, since the existing classes like BufferedWriter work this way. You have some flush control from the caller this way by calling flush() on the writer at any point.

Something like the following code.

class BufferedDatabaseWriter implements Flushable {
    List<DomObject> buffer = new LinkedList<DomObject>();
    public void write(DomObject o) {
        buffer.add(o);
        if(buffer.length > 1000)
            flush();
    }
    public void flush() {
        //write buffer to database and clear it
    }
}

Now your stream gets processed like this:

BufferedDatabaseWriter writer = new BufferedDatabaseWriter();
stream.forEach(o -> writer.write(o));
//if you have more streams stream2.forEach(o -> writer.write(o));
writer.flush();

If you want to work multithreaded, you could run the flush asynchronous. The taking from the stream can't go in parallel but I don't think there is a way to count 1000 elements from a stream in parallel anyway.

You can also extend the writer to allow setting of the buffer size in constructor or you can make it implement AutoCloseable and run it in a try with ressources and more. The nice things you have from a BufferedWriter.

Scornful answered 17/5, 2018 at 8:4 Comment(1)
You can also make it AutoCloseable and then do try (BufferedDatabaseWriter bdw = new BufferedDatabaseWriter()) { stream.forEach(o -> writer.write(o)); }Laurynlausanne
B
1

Here is a Guava-based solution to process a stream in chunks:

Stream<Integer> stream = Stream.of(0, 1, 2, 3, 4, 5, 6);

// Batches: [[0, 1, 2], [3, 4, 5], [6]]
Iterators.partition(stream.iterator(), 3)
        .forEachRemaining(batch -> processBatch(batch));

This converts the initial Stream<T> to an Iterator<T> using Stream.iterator. It then uses the Guava Iterators.partition method to partition the iterator, resulting in an Iterator<List<T>>. Finally, the Iterator.forEachRemaining method is used to process each batched list of items.


This is what it might look like applied to your specific scenario:

Iterators.partition(domStream.iterator(), 1000).forEachRemaining(batch -> {
    batch.forEach({
        insertElements(batch);
        flush();
    });
});
Byyourleave answered 8/1 at 17:13 Comment(0)
T
0

You can use this class, https://github.com/1wpro2/jdk-patch/blob/main/FixedSizeSpliterator.java.

Pass in the chunk size as the THRESHOLD

new FixedSizeSpliterator(T[] values, int threshold)

Troup answered 17/3, 2021 at 8:0 Comment(0)
F
0

In case you need very simple solution:

import java.util.List;

class Scratch {

    public static void main(String[] args) {
        List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8);
        int chunkSize = 3;
        for (int i = 0; i < list.size() / chunkSize + Math.min(1, list.size() % chunkSize); i++) {
            List<Integer> subList = list.subList(i * chunkSize, Math.min(i * chunkSize + chunkSize, list.size()));
            System.out.println("subList = " + subList);
        }
    }
}

Output:

subList = [1, 2, 3]
subList = [4, 5, 6]
subList = [7, 8]
Falkirk answered 28/9, 2022 at 12:20 Comment(1)
I just wonder how Oracle team don't do some utilities, while other languages have tons of utilities integrated in syntax or core language libraries. Java suffers from good utils.Falkirk
L
0

I wanted a solution which wasn't relying on a mutable state, and came up with this:

var idents = IntStream.range(0, 1000).boxed().toList();
int max = 10;
var result =
    idents.stream()
        .collect(() -> new ArrayList<Set<Integer>>(),
            (ret, id) -> {
                if (ret.isEmpty() || ret.get(ret.size() - 1).size() == max) {
                    ret.add(new HashSet<>());
                }
                ret.get(ret.size() - 1).add(id);
            },
            ArrayList::addAll);

It even works with a parallel stream but, in this case, there might be more than one chunk with less than maximum entries.

Lexi answered 2/6, 2023 at 8:22 Comment(0)
H
0

There's no built-in stream operation for this, and no really good way to build it on top of the built-in stream operations. But there are not-so-good ways.

One such way is to abuse flatMap with a mutable function which accumulates elements, and then emits batches when they are complete. The problem with this is that you need to emit one final batch when the stream ends, and stream operations don't get any kind of signal when that happens (as an aside, unlike transducers in Clojure, where this is easy). You can overcome that by appending a sentinel value to the stream, and emitting your last batch when you see it. Like this:

public static <E> Stream<List<E>> batch(Stream<E> stream, E sentinel, int batchSize) {
    return Stream.concat(stream, Stream.of(sentinel))
                 .flatMap(new Function<>() {
                     List<E> batch = null;

                     @Override
                     public Stream<List<E>> apply(E element) {
                         boolean emit;
                         if (element != sentinel) {
                             if (batch == null) batch = new ArrayList<>();
                             batch.add(element);
                             emit = batch.size() == batchSize;
                         } else {
                             emit = batch != null;
                         }

                         if (emit) {
                             List<E> batchToEmit = batch;
                             batch = null;
                             return Stream.of(batchToEmit);
                         } else {
                             return Stream.empty();
                         }
                     }
                 });
}

This is a bit more cumbersome than dmitryvim's answer. It is some sense cleaner, because it doesn't introduce an additional terminal operation into what is a single logical stream pipeline. But i don't think there are any actual practical problems that stem from doing that.

So here's a more concise way to do it, using dmitryvim's approach of going via an iterator, but using amusing nested Stream.generate calls instead of an iterator:

public static <E> Stream<List<E>> batch(Stream<E> stream, int batchSize) {
    Iterator<E> iterator = stream.iterator();
    return Stream.generate(() -> {
                     if (!iterator.hasNext()) return null;
                     return Stream.generate(() -> iterator.hasNext() ? iterator.next() : null)
                                  .takeWhile(Objects::nonNull)
                                  .limit(batchSize)
                                  .toList();
                 })
                 .takeWhile(Objects::nonNull);
}
Henriquez answered 30/10, 2023 at 15:49 Comment(0)
G
0

You can also just write your own Collector:

public class PartitionStream {
    
    public static void main(String[] args) {
        System.out.println(IntStream.range(0, 86).boxed().collect(partitionBy(10)));
    }

    static <T> Collector <T, List<List<T>>, List<List<T>>> partitionBy(int n) {
        Supplier<List<List<T>>> supplier = () -> new ArrayList();

        BiConsumer<List<List<T>>, T> accumulator = (acc, x) -> {
            if(acc.size() == 0 || acc.get(acc.size() - 1).size() >= n){
                acc.add(new ArrayList());
            }
            acc.get(acc.size() - 1).add(x);
        };

        BinaryOperator<List<List<T>>> combiner = (l, r) -> {
            r.stream()
                    .flatMap(List::stream)
                    .forEach(x -> accumulator.accept(l, x));
            return l;
        };

        return Collector.of(supplier, accumulator, combiner);
    }
}
Granuloma answered 16/3 at 13:43 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.