Custom thread pool in Java 8 parallel stream
Asked Answered
A

16

489

Is it possible to specify a custom thread pool for Java 8 parallel stream? I can not find it anywhere.

Imagine that I have a server application and I would like to use parallel streams. But the application is large and multi-threaded so I want to compartmentalize it. I do not want a slow running task in one module of the applicationblock tasks from another module.

If I can not use different thread pools for different modules, it means I can not safely use parallel streams in most of real world situations.

Try the following example. There are some CPU intensive tasks executed in separate threads. The tasks leverage parallel streams. The first task is broken, so each step takes 1 second (simulated by thread sleep). The issue is that other threads get stuck and wait for the broken task to finish. This is contrived example, but imagine a servlet app and someone submitting a long running task to the shared fork join pool.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}
Archdeacon answered 16/1, 2014 at 13:26 Comment(3)
What do you mean by custom thread pool? There is a single common ForkJoinPool but you can always create your own ForkJoinPool and submit requests to it.Chancery
Hint: Java Champion Heinz Kabutz inspects the same problem but with even worse impact: Deadlocking threads of the common fork join pool. See javaspecialists.eu/archive/Issue223.htmlReproachless
The accepted answer is wrong. At least with Java 8u352 and Java 17, the stream uses both the dedicated and the common fork-join-pool. You can reach the wanted behaviour using CompletableFuture as I have outlined in my answer (far, far) below. You should think about removing the green tick.Answer
A
511

There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"

Archdeacon answered 8/3, 2014 at 13:12 Comment(17)
Details on the solution are described here blog.krecan.net/2014/03/18/…Archdeacon
But is it also specified that streams use the ForkJoinPool or is that an implementation detail? A link to the documentation would be nice.Unholy
@NicolaiParlog You are right, I can't find it written anywhere. There is a related question hereArchdeacon
@Archdeacon Thanks for the snippet. I will add that the ForkJoinPool instance should be shutdown() when it's not needed any longer to avoid a thread leak. (example)Carbaugh
@Archdeacon what about using more than 2? Why doesn't it work if I use 100 for example?Flesher
@PabloMatiasGomez You mean the "2" used as the ForkJoinPool constructor argument? It works (if I remember it correctly) and actually makes sense for blocking tasks.Archdeacon
There's a nice test case here as well.Paynter
Using join() instead of get() will more closely match the default behavior without the custom pool. (It will not throw checked exceptions that get() does.)Eats
Note that there's a bug in Java 8 that even though tasks are running on a custom pool instance, they are still coupled to the shared pool: the size of the computation remains in proportion to the common pool and not the custom pool. Was fixed in Java 10: JDK-8190974Ravi
Is there any way to achieve this using ThreadPoolExecutor? I found out that the first task is executed by ThreadPool but others will be executed by common fork join pool.Stuppy
@terran This issue has also been fixed for Java 8 bugs.openjdk.java.net/browse/JDK-8224620Aleshia
@CutbertoOcampo Nice. Thought that Oracle doesn't downstream any longer. Maybe RedHat took over.Ravi
Why cant it be in the official streams API? What if streams stop using ForkJoinPool?Shepperd
@Carbaugh I agree, I edited the code snippet to include .shutdown(). There was also a missing .boxed() before .collect(...).Azaleah
FYI -- the JDK team does not like this solution at all -- see: mail.openjdk.java.net/pipermail/jdk-dev/2019-October/… mail.openjdk.java.net/pipermail/jdk-dev/2019-October/… mail.openjdk.java.net/pipermail/jdk-dev/2019-October/…Azaleah
I doubt this is correct. Any IntStream.range(..).parallel().collect(Collectors.toList()); will run in sync by default, at least on my machine jdk11. So the above is no proof. Change .collect() to .forEach(i -> System.out.println(i)) and you will see the numbers are printed in order even without the ForkJoinPool wrapper.Firebreak
This did not work, neither with 8u352 nor with 17.0.1. I had to encapsulate the complete stream handling into CompletableFuture.(supply|run)Async() with explicitly passing the dedicated ForkJoinPool as executor (2nd parameter). Only then the parellel stream stopped using the main thread or the common fork-join-pool.Answer
E
248

The parallel streams use the default ForkJoinPool.commonPool which by default has one less threads as you have processors, as returned by Runtime.getRuntime().availableProcessors() (This means that parallel streams leave one processor for the calling thread).

For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.

This also means if you have nested parallel streams or multiple parallel streams started concurrently, they will all share the same pool. Advantage: you will never use more than the default (number of available processors). Disadvantage: you may not get "all the processors" assigned to each parallel stream you initiate (if you happen to have more than one). (Apparently you can use a ManagedBlocker to circumvent that.)

To change the way parallel streams are executed, you can either

  • submit the parallel stream execution to your own ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); or
  • you can change the size of the common pool using system properties: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") for a target parallelism of 20 threads.

Example of the latter on my machine which has 8 processors. If I run the following program:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

The output is:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

So you can see that the parallel stream processes 8 items at a time, i.e. it uses 8 threads. However, if I uncomment the commented line, the output is:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

This time, the parallel stream has used 20 threads and all 20 elements in the stream have been processed concurrently.

Enemy answered 16/1, 2014 at 20:58 Comment(10)
The commonPool has actually one less than availableProcessors, resulting in total parallelism equal to availableProcessors because the calling thread counts as one.Photometer
submit return ForkJoinTask. To imitate parallel() get() is needed: stream.parallel().forEach(soSomething)).get();Custodial
I am not convinced that ForkJoinPool.submit(() -> stream.forEach(...)) will run my Stream actions with the given ForkJoinPool. I would expect that the whole Stream-Action is executed in the ForJoinPool as ONE action, but internally still using the default/common ForkJoinPool. Where did you see, that the ForkJoinPool.submit() would do what you say it does?Bondy
@FredericLeitenberger You probably meant to place your comment below Lukas' answer.Enemy
I see now https://mcmap.net/q/66020/-custom-thread-pool-in-java-8-parallel-stream shows nicely that it actually works as announced. Yet i still don't understand HOW it works. But i'm fine with "it works". Thanks!Bondy
@FredericLeitenberger one property of the fork/join framework is that tasks can split off subtasks. For this to work (without handling the thread pool down) it has a built-in context/concept of "current pool". See ForkJoinTask#fork(): hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/share/…Metabolism
@Metabolism thanks. And funny! I just stumbled across this code in my project again and then i saw your comment here! My question is 2,5 years old! :-) What a coincidence.Bondy
I was researching the issue because of heavy parallelStream() usage in some code reviews. I also noticed it's covered in some later answers, so sorry for the duplicate.Metabolism
Don't you mean "calling" thread instead of "main" thread?Nucleoprotein
I suggest reverting the Tod Casasent’s edit, as nothing in JDK-8190974 suggests that System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", …) shall not work anymore and as of JDK 18, it still works as intended.Alexandriaalexandrian
C
55

Alternatively to the trick of triggering the parallel computation inside your own forkJoinPool you can also pass that pool to the CompletableFuture.supplyAsync method like in:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);
Corenda answered 3/1, 2015 at 8:5 Comment(0)
V
24

The original solution (setting the ForkJoinPool common parallelism property) no longer works. Looking at the links in the original answer, an update which breaks this has been back ported to Java 8. As mentioned in the linked threads, this solution was not guaranteed to work forever. Based on that, the solution is the forkjoinpool.submit with .get solution discussed in the accepted answer. I think the backport fixes the unreliability of this solution also.

ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
        .forEach((int theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List<Integer> list = new ArrayList<>();
for (int theInt: array)
{
    list.add(theInt);
}
fjpool.submit(() -> list.parallelStream()
        .forEach((theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();
Vickery answered 26/8, 2016 at 18:15 Comment(5)
I don't see the change in parallelism when I do ForkJoinPool.commonPool().getParallelism() in debug mode.Unpolitic
Thanks. I did some testing/research and updated the answer. Looks like an update changed it, as it works in older versions.Vickery
Why do I keep getting this: unreported exception InterruptedException; must be caught or declared to be thrown even with all the catch exceptions in the loop.Wherry
Rocky, I'm not seeing any errors. Knowing the Java version and the exact line will help. The "InterruptedException" suggests the try/catch around the sleep is not closed properly in your version.Vickery
When I do System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10"); System.out.println(ForkJoinPool.commonPool().getParallelism());, it consistently prints 10 on all versions from JDK 8 to JDK 18. I don’t know why you claim that this common parallelism property does not work; the link you’ve added to the other answer does not even remotely say anything about this property and its patch does not touch this functionality at all.Alexandriaalexandrian
S
18

We can change the default parallelism using the following property:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

which can set up to use more parallelism.

Severn answered 22/2, 2019 at 6:59 Comment(3)
Although it's a global setting, it works to increase the parallelStreamAccessible
Same person as above, this is not working for me on openjdk "11.0.6"Barkley
@Barkley worked on all versions I tried, from Java 8 to Java 18.Alexandriaalexandrian
P
10

To measure the actual number of used threads, you can check Thread.activeCount():

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

This can produce on a 4-core CPU an output like:

5 // common pool
23 // custom pool

Without .parallel() it gives:

3 // common pool
4 // custom pool
Pang answered 21/1, 2016 at 17:49 Comment(1)
The Thread.activeCount() doesn't tell you what threads are processing your stream. Map to Thread.currentThread().getName() instead, followed by a distinct(). Then you will realize that not every thread in the pool will be used... Add a delay to your processing and all threads in the pool will be utilized.Iphigeniah
B
9

Until now, I used the solutions described in the answers of this question. Now, I came up with a little library called Parallel Stream Support for that:

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

But as @PabloMatiasGomez pointed out in the comments, there are drawbacks regarding the splitting mechanism of parallel streams which depends heavily on the size of the common pool. See Parallel stream from a HashSet doesn't run in parallel .

I am using this solution only to have separate pools for different types of work but I can not set the size of the common pool to 1 even if I don't use it.

Bugaboo answered 9/8, 2016 at 20:6 Comment(0)
D
6

Note: There appears to be a fix implemented in JDK 10 that ensures the Custom Thread Pool uses the expected number of threads.

Parallel stream execution within a custom ForkJoinPool should obey the parallelism https://bugs.openjdk.java.net/browse/JDK-8190974

Donne answered 13/6, 2018 at 20:9 Comment(0)
D
5

If you don't want to rely on implementation hacks, there's always a way to achieve the same by implementing custom collectors that will combine map and collect semantics... and you wouldn't be limited to ForkJoinPool:

list.stream()
  .collect(parallel(i -> process(i), executor, 4))
  .join()

Luckily, it's done already here and available on Maven Central: http://github.com/pivovarit/parallel-collectors

Disclaimer: I wrote it and take responsibility for it.

Drapery answered 1/2, 2019 at 13:51 Comment(0)
A
4

The (currently) accepted answer is partly wrong. It is not sufficient to just submit() the parallel stream to the dedicated fork-join-pool. In this case, the stream will use that pool's threads and additionally the common fork-join-pool and even the calling thread to handle the workload of the stream, it seems up to the size of the common fork-join pool. The behaviour is a bit weird but definitely not what is required.

To actually restrict the work completely to the dedicated pool, you must encapsulate it into a CompletableFuture:

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
  forkJoinPool = new ForkJoinPool(parallelism);
  final List<Integer> primes = CompletableFuture.supplyAsync(() -> 
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList()),
    forkJoinPool)  // <- passes dedicated fork-join pool as executor
    .join();  // <- Wait for result from forkJoinPool
    System.out.println(primes);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

This code stays with all operations in forkJoinPool on both Java 8u352 and Java 17.0.1.

Answer answered 20/11, 2022 at 13:12 Comment(0)
B
1

Go to get abacus-common. Thread number can by specified for parallel stream. Here is the sample code:

LongStream.range(4, 1_000_000).parallel(threadNum)...

Disclosure: I'm the developer of abacus-common.

Bruton answered 2/12, 2016 at 3:26 Comment(0)
V
1

If you don't need a custom ThreadPool but you rather want to limit the number of concurrent tasks, you can use:

List<Path> paths = List.of("/path/file1.csv", "/path/file2.csv", "/path/file3.csv").stream().map(e -> Paths.get(e)).collect(toList());
List<List<Path>> partitions = Lists.partition(paths, 4); // Guava method

partitions.forEach(group -> group.parallelStream().forEach(csvFilePath -> {
       // do your processing   
}));

(Duplicate question asking for this is locked, so please bear me here)

Vinia answered 1/11, 2018 at 10:10 Comment(0)
S
1

Here is how I set the max thread count flag mentioned above programatically and a code sniped to verify that the parameter is honored

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
Set<String> threadNames = Stream.iterate(0, n -> n + 1)
  .parallel()
  .limit(100000)
  .map(i -> Thread.currentThread().getName())
  .collect(Collectors.toSet());
System.out.println(threadNames);

// Output -> [ForkJoinPool.commonPool-worker-1, Test worker, ForkJoinPool.commonPool-worker-3]
Seaway answered 18/8, 2020 at 8:3 Comment(0)
D
0

If you don't mind using a third-party library, with cyclops-react you can mix sequential and parallel Streams within the same pipeline and provide custom ForkJoinPools. For example

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

Or if we wished to continue processing within a sequential Stream

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[Disclosure I am the lead developer of cyclops-react]

Dorsiventral answered 10/3, 2017 at 12:4 Comment(0)
K
0

I tried the custom ForkJoinPool as follows to adjust the pool size:

private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
    List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
    return () -> aList.parallelStream()
            .peek((i) -> {
                String threadName = Thread.currentThread().getName();
                ThreadNameSet.add(threadName);
            })
            .reduce(0L, Long::sum);
}

private static void testForkJoinPool() {
    final int parallelism = 10;

    ForkJoinPool forkJoinPool = null;
    Long result = 0L;
    try {
        forkJoinPool = new ForkJoinPool(parallelism);
        result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown(); //always remember to shutdown the pool
        }
    }
    out.println(result);
    out.println(ThreadNameSet);
}

Here is the output saying the pool is using more threads than the default 4.

50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

But actually there is a weirdo, when I tried to achieve the same result using ThreadPoolExecutor as follows:

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));

but I failed.

It will only start the parallelStream in a new thread and then everything else is just the same, which again proves that the parallelStream will use the ForkJoinPool to start its child threads.

Kosciusko answered 29/5, 2018 at 1:11 Comment(3)
What could be the possible reason behind not allowing other executors?Stuppy
@Stuppy That’s a good question perhaps you could start a a new question and provide more details to elaborate your ideas ;)Kosciusko
@Stuppy because it never was an intended feature. The Stream implementation happened to use Fork/Join tasks and it wasn’t considered that these tasks have the feature of picking up the caller’s pool if being called from a Fork/Join pool’s worker thread. Even today, this trick is not documented nor officially supported. That’s also the reason why the first versions didn’t respect the custom pool’s parallelism but inconsistently used the common pool’s. Using a different pool wasn’t foreseen.Alexandriaalexandrian
C
0

I made utility method to run task in parallel with argument which defines max number of threads.

public static void runParallel(final int maxThreads, Runnable task) throws RuntimeException {
    ForkJoinPool forkJoinPool = null;
    try {
        forkJoinPool = new ForkJoinPool(maxThreads);
        forkJoinPool.submit(task).get();
    } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException(e);
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown();
        }
    }
}

It creates ForkJoinPool with max number of allowed threads and it shuts it down after the task completes (or fails).

Usage is following:

final int maxThreads = 4;
runParallel(maxThreads, () -> 
    IntStream.range(1, 1_000_000).parallel()
            .filter(PrimesPrint::isPrime)
            .boxed().collect(Collectors.toList()));
Charissacharisse answered 21/7, 2022 at 15:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.