Java parallelStream() with custom pool with caller work stealing?
Asked Answered
S

1

10

Normally when one uses Java 8's parallelStream(), the result is execution via the default, common fork-join pool (i.e. ForkJoinPool.commonPool()).

That is clearly undesirable, however, if one has work that is far from CPU bound, e.g. may be waiting on IO much of the time. In such cases one will want to use a separate pool, sized according to other criteria (e.g. how much of the time the tasks are likely to be actually using the CPU).

There's no obvious means of getting parallelStream() to use a different pool, but there is a way as detailed here.

Unfortunately, that approach entails invoking the terminal operation on the parallel stream from a fork-join pool thread. The downside of this is that if the target-fork join pool is completely busy with existing work, the whole execution will wait on it while doing absolutely nothing. Thus the pool can become a bottleneck worse than single threaded execution. By contrast, when one uses parallelStream() in the "normal" fashion, ForkJoinPool.common.externalHelpComplete() or ForkJoinPool.common.tryExternalUnpush() are used to let the calling thread from outside the pool help in the processing.

Does anyone know of a way to both get parallelStream() to use a non-default fork-join pool and have a calling thread from outside the fork-join pool help in the processing of this work (but not the rest of the fork-join pool's work)?

Sherlynsherm answered 1/10, 2015 at 15:39 Comment(3)
I don't understand your The downside of this is that if the target-fork join pool is completely busy with existing work. Wouldn't you be creating a new pool just for this parallel stream invocation?Aeronautics
It’s even worse. When you are calling get on your tasks which is not in the common pool, it will still call ForkJoinPool.common.tryExternalUnpush(), but, of course, won’t find the task in the common pool’s queue.Seisin
To answer the question, no, I'd not be creating a new thread pool just for this invocation. Rather I'd be sharing this other thread pool across many similar invocations, some of which could be overlapping, some of which could have much longer/bigger tasks than others, etc.Sherlynsherm
S
2

You can use awaitQuiescence on the pool to help out. However, you can’t select which task(s) you will help, it will just take the next pending from the pool, thus, if there are more pending tasks, you might ending up executing these before getting to your own.

ForkJoinPool forkJoinPool = new ForkJoinPool(1);
// make all threads busy:
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));
// submit our task (may contain your stream operation)
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread());
// help out
while(!task.isDone()) // use zero timeout to execute one task only
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS);
System.out.println(Thread.currentThread()==task.get());

will print true.

whereas

ForkJoinPool forkJoinPool = new ForkJoinPool(1);
// make all threads busy:
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));
// overload:
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));
// submit our task (may contain your stream operation)
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread());
// help out
while(!task.isDone())
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS);
System.out.println(Thread.currentThread()==task.get());

will hang forever as it attempts to execute the second blocking task.

Nevertheless, it will let the initiating thread help processing the pool’s pending tasks which will raise the chance of its own task getting executed as long as there are no infinite tasks (the example above is extreme and only chosen for demonstration).


But note that the entire relationship between the Fork/Join framework and the Stream API is an implementation detail anyway.

Seisin answered 1/10, 2015 at 18:17 Comment(3)
I had concluded that I could do that, but as you note, that could very well mean that I end up helping out some other thread with its tasks, rather than helping work my own. That's kind of a non-starter. Also, I get that fork-join is an implementation detail, but there needs to be better control over parallelStream(), e.g. something as simple as parallelStream(forkJoinPool).Sherlynsherm
Well, with a method like parallelStream(ForkJoinPool), it wouldn’t be an implementation detail anymore…Seisin
No, but what parallelStream() with no-arguments does would still be an implementation detail. This would just give you an option to take a bit of control in cases where you need it.Sherlynsherm

© 2022 - 2024 — McMap. All rights reserved.