Why does parallelStream not use the entire available parallelism?
Asked Answered
B

2

8

I have a custom ForkJoinPool created with parallelism of 25.

customForkJoinPool = new ForkJoinPool(25);

I have a list of 700 file names and I used code like this to download the files from S3 in parallel and cast them to Java objects:

customForkJoinPool.submit(() -> {
   return fileNames
     .parallelStream()
     .map((fileName) -> {
        Logger log = Logger.getLogger("ForkJoinTest");
        long startTime = System.currentTimeMillis();
        log.info("Starting job at Thread:" + Thread.currentThread().getName());
        MyObject obj = readObjectFromS3(fileName);
        long endTime = System.currentTimeMillis();
        log.info("completed a job with Latency:" + (endTime - startTime));
        return obj;
     })
     .collect(Collectors.toList);
   });
});

When I look at the logs, I see only 5 threads being used. With a parallelism of 25, I expected this to use 25 threads. The average latency to download and convert the file to an object is around 200ms. What am I missing?

May be a better question is how does a parallelstream figure how much to split the original list before creating threads for it? In this case, it looks like it decided to split it 5 times and stop.

Birchfield answered 12/6, 2015 at 22:15 Comment(1)
This problem is bad fit for fork-join. For starters, you'd need to be using ManagedBlocker since your task is blocking. But, since it's I/O, and not recursively divisible, it's still not a good fit.Selemas
S
6

Why are you doing this with ForkJoinPool? It's meant for CPU-bound tasks with subtasks that are too fast to warrant individual scheduling. Your workload is IO-bound and with 200ms latency the individual scheduling overhead is negligible.

Use an Executor:

import static java.util.stream.Collectors.toList;
import static java.util.concurrent.CompletableFuture.supplyAsync;

ExecutorService threads = Executors.newFixedThreadPool(25);

List<MyObject> result = fileNames.stream()
        .map(fn -> supplyAsync(() -> readObjectFromS3(fn), threads))
        .collect(toList()).stream()
        .map(CompletableFuture::join)
        .collect(toList());
Stoneblind answered 12/6, 2015 at 23:35 Comment(2)
Thanks. This answers my actual issue. Though I still wonder how parallelStream figures out how much to split and when to stop :)Birchfield
@Birchfield - In this case, it doesn't .... and that is the problem. See my answer.Separates
S
5

I think that the answer is in this ... from the ForkJoinPool javadoc.

"The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others. However, no such adjustments are guaranteed in the face of blocked I/O or other unmanaged synchronization."

In your case, the downloads will be performing blocking I/O operations.

Separates answered 12/6, 2015 at 23:27 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.