If I want to run a Stream in parallel in a background task is it possible to run it in lower priority? And if so how?
Is it possible to set the priority of the threads in Stream.parallel()?
Asked Answered
Yes it is possible.
The procedure is as follows:
Create a
ForkJoinWorkerThreadFactory
that creates threads with an appropriate priority.Create a
ForkJoinPool
using the above thread factory.Instantiate the parallel stream.
Run the stream by submitting it to the
ForkJoinPool
Something like this:
public class MyThread extends ForkJoinWorkerThread {
public MyThread(ForkJoinPool pool, int priority) {
super(pool);
setPriority(priority);
}
}
final int poolSize = ...
final int priority = ...
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new MyThread(pool, priority);
}
};
/*
ForkJoinWorkerThreadFactory factory = pool -> new MyThread(
pool,
priority
);
*/
ForkJoinPool customThreadPool = new ForkJoinPool(
poolSize, factory, null, false);
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
(Example code adapted from http://www.baeldung.com/java-8-parallel-streams-custom-threadpool)
It still doesn't compile. I think you have to subclass it. –
Birth
This should do it:
ForkJoinWorkerThreadFactory factory = pool -> new ForkJoinWorkerThread(pool) {{ setPriority(priority); }};
–
Birth Did it a slightly different way ... but it compiles and runs. The only gotcha is that
submit
throws InterruptedException
which needs to be handled. –
Overstuff The second gotcha might be that (unless you're on Java 10) the number of fork join tasks created will be in proportion to the common pool parallelism –
Disgusting
ForkJoinPool.ForkJoinWorkerThreadFactory
is (structurally) a functional interface, so factory can be defined as pool -> new MyThread(pool, priority)
. –
Evangelize Should be noted that this behavior is an undocumented implementation details, as is the entire use of the Fork/Join pool. There is no guaranty that this works in other implementations. –
Bathetic
I think a better way to do this is like described here:
public class CustomForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory {
private final int threadPriority;
public CustomForkJoinWorkerThreadFactory(int threadPriority) {
this.threadPriority = threadPriority;
}
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool)
{
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setPriority(threadPriority);
return worker;
}
}
It allows you to still use a "default" ForkJoinWorkerThread, but you can set priority / name / etc. Use like this:
new ForkJoinPool(poolSize, new CustomForkJoinWorkerThreadFactory(Thread.MIN_PRIORITY), null, false);
Inline version of the previous answer
new ForkJoinPool(parallelism, (pool) -> {
ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setPriority(NORM_PRIORITY + 1);
return thread;
}, null, false);
© 2022 - 2024 — McMap. All rights reserved.
pool
argument. – Overstuff