How to properly submit and get several Futures in the same Java stream?
Asked Answered
W

1

3

I try to submit and get 10 Futures in the same stream. Each one takes 1 second to process and I would like to run them in parallel.

My first try is takes_10_sec() which runs sequentially and takes 10s.

My second try is takes_1_sec() which runs in parallel and takes 1s. However it uses an intermediate .collect(Collectors.toList()).stream() which I don't think is a good way to do it.

Is there another recommended way?

public class FutureStream {
    private ExecutorService executor = Executors.newFixedThreadPool(10);;

    @Test
    public void takes_10_sec() {
        IntStream.range(0, 10)
                .mapToObj(i -> longTask())
                .map(task -> {
                    try {
                        return task.get();
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                })
                .forEach(System.out::println);
    }

    @Test
    public void takes_1_sec() {
        IntStream.range(0, 10)
                .mapToObj(i -> longTask())
                .collect(Collectors.toList())
                .stream()
                .map(task -> {
                    try {
                        return task.get();
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                })
                .forEach(System.out::println);
    }

    private Future<String> longTask() {
        return executor.submit(() -> {
            Thread.sleep(1000);
            return Thread.currentThread().getName();
        });
    }
}
Wifehood answered 1/6, 2017 at 4:31 Comment(0)
J
4

Streams are lazy and will only process elements as needed by the terminal operation. For each element, the whole pipeline is processed before starting with the next element (except for parallel streams). This technique allows short-circuiting operations for example.

Since you have an intermediate map() operation that blocks on the result of the created future, processing it will wait for each future to complete before creating the next one.

Collecting them all as you do makes sure that all futures are created first. This is the appropriate solution as you need to make sure that the whole stream is processed before handling the results.

Jodhpurs answered 1/6, 2017 at 8:58 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.