The call to wait will still block even if Future.cancel(..)
is called. As mentioned by others the CompletableFuture
will not use interrupts to cancel the task.
According to the javadoc of CompletableFuture.cancel(..)
:
mayInterruptIfRunning this value has no effect in this implementation because interrupts are not used to control processing.
Even if the implementation would cause an interrupt, you would still need a blocking operation in order to cancel the task or check the status via Thread.interrupted()
.
Instead of interrupting the Thread
, which might not be always easy to do, you may have check points in your operation where you can gracefully terminate the current task. This can be done in a loop over some elements that will be processed or you check before each step of the operation for the cancel status and throw an CancellationException
yourself.
The tricky part is to get a reference of the CompletableFuture
within the task in order to call Future.isCancelled()
. Here is an example of how it can be done:
public abstract class CancelableTask<T> {
private CompletableFuture<T> task;
private T run() {
try {
return compute();
} catch (Throwable e) {
task.completeExceptionally(e);
}
return null;
}
protected abstract T compute() throws Exception;
protected boolean isCancelled() {
Future<T> future = task;
return future != null && future.isCancelled();
}
public Future<T> start() {
synchronized (this) {
if (task != null) throw new IllegalStateException("Task already started.");
task = new CompletableFuture<>();
}
return task.completeAsync(this::run);
}
}
Edit: Here the improved CancelableTask
version as a static factory:
public static <T> CompletableFuture<T> supplyAsync(Function<Future<T>, T> operation) {
CompletableFuture<T> future = new CompletableFuture<>();
return future.completeAsync(() -> operation.apply(future));
}
here is the test method:
@Test
void testFuture() throws InterruptedException {
CountDownLatch started = new CountDownLatch(1);
CountDownLatch done = new CountDownLatch(1);
AtomicInteger counter = new AtomicInteger();
Future<Object> future = supplyAsync(task -> {
started.countDown();
while (!task.isCancelled()) {
System.out.println("Count: " + counter.getAndIncrement());
}
System.out.println("Task cancelled");
done.countDown();
return null;
});
// wait until the task is started
assertTrue(started.await(5, TimeUnit.SECONDS));
future.cancel(true);
System.out.println("Cancel called");
assertTrue(future.isCancelled());
assertTrue(future.isDone());
assertTrue(done.await(5, TimeUnit.SECONDS));
}
If you really want to use interrupts in addition to the CompletableFuture
, then you can pass a custom Executor
to CompletableFuture.completeAsync(..)
where you create your own Thread
, override cancel(..)
in the CompletableFuture
and interrupt your Thread
.
Executors.newFixedThreadPool
vsExecutors.newWorkStealingPool
? It would make the question clearer to compare two different executor implementations than comparing futures vs. completable futures. – Karns