Removing all queued tasks of an ThreadPoolExecutor
Asked Answered
B

10

31

i have this rather simple question about the ThreadPoolExecutor. I have the following situation: I have to consume objects from a queue, create the appropiate worker tasks for them and submit them to the ThreadPoolExecutor. This is quite simple. But within a shutdown scenario many workers may be queued to execution. Since one of those tasks might be running for an hour, and i want a relativly fast graceful shutdown of the application i want to discard all queued tasks from the ThreadPoolExecutor while the already processing tasks should be completed normally.

The ThreadPoolExecutor documentation has a remove() method but only allows specific tasks to be removed. purge() only works for already canceled Future tasks. My idea was to clear the queue holding all queued tasks. The ThreadPoolExecutor provides access to this internal queue but the documentation states:

Method getQueue() allows access to the work queue for purposes of monitoring and debugging. Use of this method for any other purpose is strongly discouraged.

So grabbing this queue and clearing it is not an option. Also, this snippet of the documentation says:

Two supplied methods, remove(java.lang.Runnable) and purge() are available to assist in storage reclamation when large numbers of queued tasks become cancelled.

How? Sure, i can maintain a list of all tasks i submitted to the executor and in a shutdown case i iterate over all entries and remove them from the ThreadPoolExecutor with the remove() method... but... come on, this is a waste of memory and a hassle to maintain this list. (Removing already executed tasks for example)

I appreciate any hints or solutions!

Bield answered 5/11, 2009 at 10:16 Comment(0)
C
11

Have you considered wrapping the ExecutorService? Create a

CleanShutdownExecutorService implements Executor 

that delegates all calls to another Executor, but keeps the Futures in a list of its own. CleanShutdownExecutorService can then have a cancelRemainingTasks() method that calls shutdown(), then calls cancel(false) on all the Futures in its list.

Chris answered 5/11, 2009 at 16:0 Comment(1)
This might be the cleanest approach, therefore i accept this answer. :-) Even i hoped there is already something like that... so: lets code this thing. :-)Bield
T
15

I used to work on an app with long running threads. We do this at shutdown,

BlockingQueue<Runnable> queue = threadPool.getQueue();
List<Runnable> list = new ArrayList<Runnable>();
int tasks = queue.drainTo(list);

The list is saved to a file. On startup, the list is added back to the pool so we don't lose any jobs.

Tribade answered 5/11, 2009 at 15:46 Comment(2)
Nice addition to my question. :-) I my case persistence of the tasks is not a problem. But your addition may help others, thanks! :)Bield
I am not sure that is the correct approach. If you shutdown by means of shutdown(), it waits until everything is terminated (even queued tasks), if you do it with shutdownNow(), that method already returns you the drained list of queued tasks. Just FYI in case.Overmaster
C
11

Have you considered wrapping the ExecutorService? Create a

CleanShutdownExecutorService implements Executor 

that delegates all calls to another Executor, but keeps the Futures in a list of its own. CleanShutdownExecutorService can then have a cancelRemainingTasks() method that calls shutdown(), then calls cancel(false) on all the Futures in its list.

Chris answered 5/11, 2009 at 16:0 Comment(1)
This might be the cleanest approach, therefore i accept this answer. :-) Even i hoped there is already something like that... so: lets code this thing. :-)Bield
A
6

As ExecutorService.shutdown() is not doing enough and ExecutorService.shutdownNow() is doing too much I guess you have to write up something in the middle: remember all your submitted tasks and remove them manually after (or before) calling shutdown().

Aport answered 5/11, 2009 at 10:18 Comment(3)
"Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted." I have several tasks submitted which should not execute anymore. But the tasks which are executing should not end.Bield
You edited your answer so my comment is not completly valid anymore. Nevertheless, The shutdownNow() documentations states: "Attempts to stop all actively executing tasks." Which is not what i want. :)Bield
Oh, right, I overread that half of the sentence. Well, I guess your only option then is to remember all submitted Runnables and remove them manually. I’ll edit accordingly.Aport
R
5

This is an old question, but in case this helps somebody else: you could set a volatile boolean when you call shutdown(), and have each submitted task terminate if that boolean is set before really starting. This will allow tasks which have genuinely started to complete, but will prevent queued tasks from starting their actual activity.

Rhonda answered 10/11, 2010 at 18:59 Comment(0)
O
3

You could create your own task queue and pass it to ThreadPoolExecutor constructor:

int poolSize = 1; // number of threads
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>();
Executor executor = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, queue);

When you clear the queue somewhere in your code then the remaining tasks will not be executed:

queue.clear();
Orwin answered 18/9, 2018 at 12:50 Comment(0)
V
2

Bombe's answer is exactly what you want. shutdownNow() stops everything using the nuke and pave approach. This is the best thing you can do, short of subclassing the implementation of ThreadPoolExecutor that you're using.

Vasos answered 5/11, 2009 at 12:59 Comment(0)
S
1

You can try allowCoreThreadTimeOut(true);

Skulk answered 27/10, 2010 at 9:7 Comment(1)
You should develop your answer and make it clear that you're actually answering the OP.Schlock
D
0

A crazy and unclean solution which might work (not real thought through or tested) would be to overwrite the interrupt() of your WorkerTasks which only in case some global value is set refuse to shutdown when interrupt() is called on them by shutdownNow().

That should allow you to use shutdownNow() no?

Downbow answered 5/11, 2009 at 13:19 Comment(0)
R
0

Tell your thread pool to shutdown, getQueue, for-each the result into individual Runnables, remove each Runnable using the remove method. Depending on the type of queue, you might be able to halt the removes early based on return values.

Basically, this is grabbing the queue and clearing it, only clearing via the methods that work. Instead of manually remembering all the submissions, you use the fact the thread pool already has to remember all the submissions. However, you will probably need to make a defensive copy of the queue, as I think it's a live view, and therefore removing would probably cause a concurrent modification exception if you were iterating/for-eaching over the live view.

Runlet answered 5/11, 2009 at 13:46 Comment(0)
M
0

Doesn't awaitTermination(long timeout, TimeUnit unit) work after shutdown?

executor.shutdown(); executor.awaitTermination(60, TimeUnit.SECONDS)

Magnusson answered 5/11, 2009 at 16:5 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.