ExecutorService that interrupts tasks after a timeout
Asked Answered
E

11

119

I'm looking for an ExecutorService implementation that can be provided with a timeout. Tasks that are submitted to the ExecutorService are interrupted if they take longer than the timeout to run. Implementing such a beast isn't such a difficult task, but I'm wondering if anybody knows of an existing implementation.

Here's what I came up with based on some of the discussion below. Any comments?

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        timeoutExecutor.shutdown();
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }
    }

    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            thread.interrupt();
        }
    }
}
Eleusis answered 3/5, 2010 at 14:10 Comment(6)
Is that 'start time' of the timeout the time of submission? Or the time the task begins executing?Hoag
Good question. When it begins executing. Presumably using the protected void beforeExecute(Thread t, Runnable r) hook.Eleusis
@scompt.com are you still using this solution or has it been supercededMolten
@PaulTaylor The job where I implemented this solution has been superceded. :-)Eleusis
I need exactly this, except a) I need my main scheduler service to be a thread pool with a single service thread since need my tasks to execute strictly concurrently and b) I need to be able to specify the timeout duration for each task at the time the task is submitted. I have tried using this as a starting point but extending ScheduledThreadPoolExecutor, but I cannot see a way to get the timeout duration specified that is to be specified at task submission time through to the beforeExecute method. Any suggestions gratefully appreciated!Grekin
This is what I want to ask.Tenuto
M
106

You can use a ScheduledExecutorService for this. First you would submit it only once to begin immediately and retain the future that is created. After that you can submit a new task that would cancel the retained future after some period of time.

 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
 final Future handler = executor.submit(new Callable(){ ... });
 executor.schedule(new Runnable(){
     public void run(){
         handler.cancel();
     }      
 }, 10000, TimeUnit.MILLISECONDS);

This will execute your handler (main functionality to be interrupted) for 10 seconds, then will cancel (i.e. interrupt) that specific task.

Mulcahy answered 3/5, 2010 at 15:12 Comment(19)
Interesting idea, but what if the task finishes before the timeout (which it normally will)? I'd rather not have tons of cleanup tasks waiting to run only to find out their assigned task has already completed. There'd need to be another thread monitoring the Futures as they finish to remove their cleanup tasks.Eleusis
The executor will only schedule this cancel once. If the task is completed then the cancel is a no op and work continues unchanged. There only needs to be one extra thread scheudling to cancel the tasks and one thread to run them. You could have two executors, one to submit your main tasks and one to cancel them.Mulcahy
That's true, but what if the timeout is 5 hours and in that time 10k tasks are executed. I'd like to avoid having all those no-ops lying around taking up memory and causing context switches.Eleusis
Understandable, however, if you have the task at 5 hours then your thread will park for (5, TimeUnit.HOURS). Since its parked there would be no switching until it is unparked and the interruption will occur then.Mulcahy
@BalusC: Ideally, the tasks themselves would remain unchanged.Eleusis
@John W.: Assuming that all tasks complete on time, there would still be 10k unnecessary interruptions, regardless of how long the timeout is.Eleusis
@Scompt Not necessarily. There would be 10k future.cancel() invocations, however if the future is completed then the cancel will fast path out and not do any uneccesary work. If you do not want 10k extra cancel invocations then this may not work, but the amount of work done when a task is completed is very small.Mulcahy
These problems could be resolved with a ExecutorCompletionService. I'm looking into that now.Eleusis
@John W.: I just realized another issue with your implementation. I need the timeout to begin when the task starts execution, as I commented earlier. I think the only way to do that is to use the beforeExecute hook.Eleusis
This answer got me thinking about the solution that I ended up with (posted in the question above), so I'm going to accept it. Thanks!Eleusis
Thank you scompt. I looked at your code and cant find an error with it. The thread-safety aspect of it is well done.Mulcahy
Yea there was no specific reason (whether good or bad) to use millisecondsMulcahy
why are we using 2 for the core pool size?Dabble
Here is an example that actually uses the value to help others like me who had trouble understanding: gist.github.com/nddipiazza/2ba54f5c09f2f51b50d10e4d0555f394Dabble
先提交一个,可以立刻执行的任务。再提交一个,用来取消刚刚那个任务的任务Tenuto
我去你妈,会被和谐嘛?我是文明人,就是做个测试。Tenuto
@MikeNereson Easy.This is not big deal.Tenuto
It does not intercept in my case.Why?Tenuto
Same, seems it does not intercept.. any suggestion why?Verniavernice
M
10

Unfortunately the solution is flawed. There is a sort of bug with ScheduledThreadPoolExecutor, also reported in this question: cancelling a submitted task does not fully release the memory resources associated with the task; the resources are released only when the task expires.

If you therefore create a TimeoutThreadPoolExecutor with a fairly long expiration time (a typical usage), and submit tasks fast enough, you end up filling the memory - even though the tasks actually completed successfully.

You can see the problem with the following (very crude) test program:

public static void main(String[] args) throws InterruptedException {
    ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 
            new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES);
    //ExecutorService service = Executors.newFixedThreadPool(1);
    try {
        final AtomicInteger counter = new AtomicInteger();
        for (long i = 0; i < 10000000; i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    counter.incrementAndGet();
                }
            });
            if (i % 10000 == 0) {
                System.out.println(i + "/" + counter.get());
                while (i > counter.get()) {
                    Thread.sleep(10);
                }
            }
        }
    } finally {
        service.shutdown();
    }
}

The program exhausts the available memory, although it waits for the spawned Runnables to complete.

I though about this for a while, but unfortunately I could not come up with a good solution.

Update

I discovered this issue was reported as JDK bug 6602600, and appears to have been fixed in Java 7.

Millwork answered 11/10, 2012 at 16:14 Comment(0)
H
6

Wrap the task in FutureTask and you can specify timeout for the FutureTask. Look at the example in my answer to this question,

java native Process timeout

Hydrostatics answered 3/5, 2010 at 14:46 Comment(2)
I realize there are a couple ways to do this using the java.util.concurrent classes, but I'm looking for an ExecutorService implementation.Eleusis
If you are saying that you want your ExecutorService to hide the fact that timeouts are being added from client code, you could implement your own ExecutorService that wraps every runnable handed to it with a FutureTask before executing them.Mckellar
F
4

After ton of time to survey,
Finally, I use invokeAll method of ExecutorService to solve this problem.
That will strictly interrupt the task while task running.
Here is example

ExecutorService executorService = Executors.newCachedThreadPool();

try {
    List<Callable<Object>> callables = new ArrayList<>();
    // Add your long time task (callable)
    callables.add(new VaryLongTimeTask());
    // Assign tasks for specific execution timeout (e.g. 2 sec)
    List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS);
    for (Future<Object> future : futures) {
        // Getting result
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

executorService.shutdown();

The pro is you can also submit ListenableFuture at the same ExecutorService.
Just slightly change the first line of code.

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

ListeningExecutorService is the Listening feature of ExecutorService at google guava project (com.google.guava) )

Fiore answered 2/8, 2016 at 1:56 Comment(3)
Thanks for pointing out invokeAll. That works very well. Just a word of caution for anyone thinking about using this: although invokeAll returns a list of Future objects, it actually seems to be a blocking operation.Modifier
would it not be blocking if we call future.get().Lisettelisha
Excellent solution with invokeAll!Soup
S
1

How about using the ExecutorService.shutDownNow() method as described in http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html? It seems to be the simplest solution.

Salem answered 4/3, 2013 at 19:27 Comment(1)
Because it will stop all the scheduled tasks and not a specific task as was requested by the questionMirandamire
C
1

It seems problem is not in JDK bug 6602600 ( it was solved at 2010-05-22), but in incorrect call of sleep(10) in circle. Addition note, that the main Thread must give directly CHANCE to other threads to realize thier tasks by invoke SLEEP(0) in EVERY branch of outer circle. It is better, I think, to use Thread.yield() instead of Thread.sleep(0)

The result corrected part of previous problem code is such like this:

.......................
........................
Thread.yield();         

if (i % 1000== 0) {
System.out.println(i + "/" + counter.get()+ "/"+service.toString());
}

//                
//                while (i > counter.get()) {
//                    Thread.sleep(10);
//                } 

It works correctly with amount of outer counter up to 150 000 000 tested circles.

Commemorative answered 27/12, 2013 at 8:24 Comment(0)
B
1

Using John W answer I created an implementation that correctly begin the timeout when the task starts its execution. I even write a unit test for it :)

However, it does not suit my needs since some IO operations do not interrupt when Future.cancel() is called (ie when Thread.interrupt() is called). Some examples of IO operation that may not be interrupted when Thread.interrupt() is called are Socket.connect and Socket.read (and I suspect most of IO operation implemented in java.io). All IO operations in java.nio should be interruptible when Thread.interrupt() is called. For example, that is the case for SocketChannel.open and SocketChannel.read.

Anyway if anyone is interested, I created a gist for a thread pool executor that allows tasks to timeout (if they are using interruptible operations...): https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf

Bryant answered 16/7, 2017 at 21:4 Comment(6)
Interesting code, I pulled it into my system and curious if you have some examples of what kind of IO operations will not interrupt so I can see if it will impact my system. Thanks!Ahmad
@DuncanKrebs I detailed my answer with an example of non interruptible IO: Socket.connect and Socket.readBryant
myThread.interrupted() is not the correct method to interrupt, as it CLEARS the interruption flag. Use myThread.interrupt() instead, and that should with socketsVespine
@DanielCuadra: Thank you, it looks like I made a typo mistake as Thread.interrupted() does not enable to interrupt a thread. However, Thread.interrupt() does not interrupt java.io operations, it works only on java.nio operations.Bryant
I have used interrupt() for many years and it has always interrupted java.io operations (as well as other blocking methods, such as thread sleep, jdbc connections, blockingqueue take, etc). Maybe you found a buggy class or some JVM that has bugsVespine
@DanielCuadra: I am not sure this is a JVM bug, it is the way it has been designed. A process cannot be abruptly stopped unless you kill the whole JVM. I made a gist using Thread.interrupt() and netcat for listening and I confirm that it does not kill the thread: gist.github.com/amanteaux/675491e8509f718040f18614bf51c573 Do not hesitate to comment the gist if the test protocol does not look right to you.Bryant
P
1

What about this?

final ExecutorService myExecutorService = ...;

// create CompletableFuture to get result/exception from runnable in specified timeout
final CompletableFuture<Object> timeoutFuture = new CompletableFuture<>();

// submit runnable and obtain cancellable Future from executor
final Future<?> cancellableFuture = myExecutorService.submit(() -> {
    try {
        Object result = myMethod(...);
        timeoutFuture.complete(result);
    } catch (Exception e) {
        timeoutFuture.completeExceptionally(e);
    }
});

// block the calling thread until "myMethod" will finish or time out (1 second)
try {
    Object result = timeoutFuture.get(1000, TimeUnit.MILLISECONDS);
    // "myMethod" completed normally
} catch (TimeoutException te) {
    // "myMethod" timed out
    // ...
} catch (ExecutionException ee) {
    // "myMethod" completed exceptionally - get cause
    final Throwable cause = ee.getCause();
    // ...
} catch (InterruptedException ie) {
    // future interrupted
    // ...
} finally {
    // timeoutFuture.cancel(true); // CompletableFuture does not support cancellation
    cancellableFuture.cancel(true); // Future supports cancellation
}
Phraseology answered 17/3, 2021 at 14:41 Comment(0)
W
0

What about this alternative idea :

  • two have two executors :
    • one for :
      • submitting the task, without caring about the timeout of the task
      • adding the Future resulted and the time when it should end to an internal structure
    • one for executing an internal job which is checking the internal structure if some tasks are timeout and if they have to be cancelled.

Small sample is here :

public class AlternativeExecutorService 
{

private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue       = new CopyOnWriteArrayList();
private final ScheduledThreadPoolExecutor                scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job
private final ListeningExecutorService                   threadExecutor    = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for
private ScheduledFuture scheduledFuture;
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L;

public AlternativeExecutorService()
{
    scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS);
}

public void pushTask(OwnTask task)
{
    ListenableFuture<Void> future = threadExecutor.submit(task);  // -> create your Callable
    futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end
}

public void shutdownInternalScheduledExecutor()
{
    scheduledFuture.cancel(true);
    scheduledExecutor.shutdownNow();
}

long getCurrentMillisecondsTime()
{
    return Calendar.getInstance().get(Calendar.MILLISECOND);
}

class ListenableFutureTask
{
    private final ListenableFuture<Void> future;
    private final OwnTask                task;
    private final long                   milliSecEndTime;

    private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime)
    {
        this.future = future;
        this.task = task;
        this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS);
    }

    ListenableFuture<Void> getFuture()
    {
        return future;
    }

    OwnTask getTask()
    {
        return task;
    }

    long getMilliSecEndTime()
    {
        return milliSecEndTime;
    }
}

class TimeoutManagerJob implements Runnable
{
    CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList()
    {
        return futureQueue;
    }

    @Override
    public void run()
    {
        long currentMileSecValue = getCurrentMillisecondsTime();
        for (ListenableFutureTask futureTask : futureQueue)
        {
            consumeFuture(futureTask, currentMileSecValue);
        }
    }

    private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue)
    {
        ListenableFuture<Void> future = futureTask.getFuture();
        boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue;
        if (isTimeout)
        {
            if (!future.isDone())
            {
                future.cancel(true);
            }
            futureQueue.remove(futureTask);
        }
    }
}

class OwnTask implements Callable<Void>
{
    private long     timeoutDuration;
    private TimeUnit timeUnit;

    OwnTask(long timeoutDuration, TimeUnit timeUnit)
    {
        this.timeoutDuration = timeoutDuration;
        this.timeUnit = timeUnit;
    }

    @Override
    public Void call() throws Exception
    {
        // do logic
        return null;
    }

    public long getTimeoutDuration()
    {
        return timeoutDuration;
    }

    public TimeUnit getTimeUnit()
    {
        return timeUnit;
    }
}
}
Wicks answered 7/8, 2015 at 13:20 Comment(0)
I
0

check if this works for you,

    public <T,S,K,V> ResponseObject<Collection<ResponseObject<T>>> runOnScheduler(ThreadPoolExecutor threadPoolExecutor,
      int parallelismLevel, TimeUnit timeUnit, int timeToCompleteEachTask, Collection<S> collection,
      Map<K,V> context, Task<T,S,K,V> someTask){
    if(threadPoolExecutor==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("threadPoolExecutor can not be null").build();
    }
    if(someTask==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("Task can not be null").build();
    }
    if(CollectionUtils.isEmpty(collection)){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("input collection can not be empty").build();
    }

    LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue = new LinkedBlockingQueue<>(collection.size());
    collection.forEach(value -> {
      callableLinkedBlockingQueue.offer(()->someTask.perform(value,context)); //pass some values in callable. which can be anything.
    });
    LinkedBlockingQueue<Future<T>> futures = new LinkedBlockingQueue<>();

    int count = 0;

    while(count<parallelismLevel && count < callableLinkedBlockingQueue.size()){
      Future<T> f = threadPoolExecutor.submit(callableLinkedBlockingQueue.poll());
      futures.offer(f);
      count++;
    }

    Collection<ResponseObject<T>> responseCollection = new ArrayList<>();

    while(futures.size()>0){
      Future<T> future = futures.poll();
      ResponseObject<T> responseObject = null;
        try {
          T response = future.get(timeToCompleteEachTask, timeUnit);
          responseObject = ResponseObject.<T>builder().data(response).build();
        } catch (InterruptedException e) {
          future.cancel(true);
        } catch (ExecutionException e) {
          future.cancel(true);
        } catch (TimeoutException e) {
          future.cancel(true);
        } finally {
          if (Objects.nonNull(responseObject)) {
            responseCollection.add(responseObject);
          }
          futures.remove(future);//remove this
          Callable<T> callable = getRemainingCallables(callableLinkedBlockingQueue);
          if(null!=callable){
            Future<T> f = threadPoolExecutor.submit(callable);
            futures.add(f);
          }
        }

    }
    return ResponseObject.<Collection<ResponseObject<T>>>builder().data(responseCollection).build();
  }

  private <T> Callable<T> getRemainingCallables(LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue){
    if(callableLinkedBlockingQueue.size()>0){
      return callableLinkedBlockingQueue.poll();
    }
    return null;
  }

you can restrict the no of thread uses from scheduler as well as put timeout on the task.

Industrial answered 14/10, 2019 at 5:59 Comment(0)
L
0

You can use this implementation that ExecutorService provides

invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
as

executor.invokeAll(Arrays.asList(task), 2 , TimeUnit.SECONDS);

However, in my case, I could not as Arrays.asList took extra 20ms.

Lisettelisha answered 2/1, 2021 at 18:45 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.