You can't in general interrupt the threads of an ExecutorService
from external code if you used ExecutorService::execute(Runnable)
to start the threads, because external code does not have a reference to the Thread
objects of each of the running threads (see the end of this answer for a solution though, if you need ExecutorService::execute
). However, if you instead use ExecutorService::submit(Callable<T>)
to submit the jobs, you get back a Future<T>
, which internally keeps a reference to the running thread once Callable::call()
begins execution. This thread can be interrupted by calling Future::cancel(true)
. Any code within (or called by) the Callable
that checks the current thread's interrupt status can therefore be interrupted via the Future
reference. This includes BlockingQueue::take()
, which, even when blocked, will respond to thread interruption. (JRE blocking methods will typically wake up if interrupted while blocked, realize they have been interrupted, and throw an InterruptedException
.)
To summarize: Future::cancel()
and Future::cancel(true)
both cancel future work, while Future::cancel(true)
also interrupts ongoing work (as long as the ongoing work responds to thread interrupt). Neither of the two cancel
invocations affects work that has already successfully completed.
Note that once a thread is interrupted by cancellation, an InterruptException
will be thrown within the thread (e.g. by BlockingQueue::take()
in this case). However, you a CancellationException
will be thrown back in the main thread the next time you call Future::get()
on a successfully cancelled Future
(i.e. a Future
that was cancelled before it completed). This is different from what you would normally expect: if a non-cancelled Callable
throws InterruptedException
, the next call to Future::get()
will throw InterruptedException
, but if a cancelled Callable
throws InterruptedException
, the next call to Future::get()
will through CancellationException
.
Here's an example that illustrates this:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
public class Test {
public static void main(String[] args) throws Exception {
// Start Executor with 4 threads
int numThreads = 4;
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
try {
// Set up BlockingQueue for inputs, and List<Future> for outputs
BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
List<Future<String>> futures = new ArrayList<>(numThreads);
for (int i = 0; i < numThreads; i++) {
int threadIdx = i;
futures.add(executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
try {
// Get an input from the queue (blocking)
int val = queue.take();
return "Thread " + threadIdx + " got value " + val;
} catch (InterruptedException e) {
// Thrown once Future::cancel(true) is called
System.out.println("Thread " + threadIdx + " got interrupted");
// This value is returned to the Future, but can never
// be read, since the caller will get a CancellationException
return "Thread " + threadIdx + " got no value";
}
}
}));
}
// Enqueue (numThreads - 1) values into the queue, so that one thread blocks
for (int i = 0; i < numThreads - 1; i++) {
queue.add(100 + i);
}
// Cancel all futures
for (int i = 0; i < futures.size(); i++) {
Future<String> future = futures.get(i);
// Cancel the Future -- this doesn't throw an exception until
// the get() method is called
future.cancel(/* mayInterruptIfRunning = */ true);
try {
System.out.println(future.get());
} catch (CancellationException e) {
System.out.println("Future " + i + " was cancelled");
}
}
} finally {
// Terminate main after all threads have shut down (this call does not block,
// so main will exit before the threads stop running)
executor.shutdown();
}
}
}
Each time you run this, the output will be different, but here's one run:
Future 1 was cancelled
Future 0 was cancelled
Thread 2 got value 100
Thread 3 got value 101
Thread 1 got interrupted
This shows that Thread 2 and Thread 3 completed before Future::cancel()
was called. Thread 1 was cancelled, so internally InterruptedException
was thrown, and externally CancellationException
was thrown. Thread 0 was cancelled before it started running. (Note that the thread indices won't in general correlate with the Future
indices, so Future 0 was cancelled
could correspond to either thread 0 or thread 1 being cancelled, and the same for Future 1 was cancelled
.)
Advanced: one way to achieve the same effect with Executor::execute
(which does not return a Future
reference) rather than Executor::submit
would be to create a ThreadPoolExecutor
with a custom ThreadFactory
, and have your ThreadFactory
record a reference in a concurrent collection (e.g. a concurrent queue) for every thread created. Then to cancel all threads, you can simply call Thread::interrupt()
on all previously-created threads. However, you will need to deal with the race condition that new threads may be created while you are interrupting existing threads. To handle this, set an AtomicBoolean
flag, visible to the ThreadFactory
, that tells it not to create any more threads, then once that is set, cancel the existing threads.