Is it possible to set the priority of the threads in Stream.parallel()?
Asked Answered
C

3

12

If I want to run a Stream in parallel in a background task is it possible to run it in lower priority? And if so how?

Coucher answered 26/11, 2017 at 3:44 Comment(0)
O
10

Yes it is possible.

The procedure is as follows:

  1. Create a ForkJoinWorkerThreadFactory that creates threads with an appropriate priority.

  2. Create a ForkJoinPool using the above thread factory.

  3. Instantiate the parallel stream.

  4. Run the stream by submitting it to the ForkJoinPool

Something like this:

public class MyThread extends ForkJoinWorkerThread {
    public MyThread(ForkJoinPool pool, int priority) {
        super(pool);
        setPriority(priority);
    }
}

final int poolSize = ...
final int priority = ...

List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
  .collect(Collectors.toList());

ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
         return new MyThread(pool, priority);
    }
};
/*
ForkJoinWorkerThreadFactory factory = pool -> new MyThread(
  pool,
  priority
);
*/

ForkJoinPool customThreadPool = new ForkJoinPool(
    poolSize, factory, null, false);
long actualTotal = customThreadPool.submit(
    () -> aList.parallelStream().reduce(0L, Long::sum)).get();

(Example code adapted from http://www.baeldung.com/java-8-parallel-streams-custom-threadpool)

Overstuff answered 26/11, 2017 at 5:53 Comment(7)
I said "something like" :-) It needed a pool argument.Overstuff
It still doesn't compile. I think you have to subclass it.Birth
This should do it: ForkJoinWorkerThreadFactory factory = pool -> new ForkJoinWorkerThread(pool) {{ setPriority(priority); }};Birth
Did it a slightly different way ... but it compiles and runs. The only gotcha is that submit throws InterruptedException which needs to be handled.Overstuff
The second gotcha might be that (unless you're on Java 10) the number of fork join tasks created will be in proportion to the common pool parallelismDisgusting
ForkJoinPool.ForkJoinWorkerThreadFactory is (structurally) a functional interface, so factory can be defined as pool -> new MyThread(pool, priority).Evangelize
Should be noted that this behavior is an undocumented implementation details, as is the entire use of the Fork/Join pool. There is no guaranty that this works in other implementations.Bathetic
W
1

I think a better way to do this is like described here:

public class CustomForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory {

    private final int threadPriority;

    public CustomForkJoinWorkerThreadFactory(int threadPriority) {
        this.threadPriority = threadPriority;
    }

    @Override           
    public ForkJoinWorkerThread newThread(ForkJoinPool pool)
    {
        final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
        worker.setPriority(threadPriority);
        return worker;
    }
}

It allows you to still use a "default" ForkJoinWorkerThread, but you can set priority / name / etc. Use like this:

new ForkJoinPool(poolSize, new CustomForkJoinWorkerThreadFactory(Thread.MIN_PRIORITY), null, false);
Wessex answered 11/7, 2018 at 20:34 Comment(0)
D
0

Inline version of the previous answer

new ForkJoinPool(parallelism, (pool) -> {
    ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
    thread.setPriority(NORM_PRIORITY + 1);
    return thread;
}, null, false);
Dialogize answered 7/6, 2023 at 6:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.