Handling exceptions from Java ExecutorService tasks
Asked Answered
B

13

250

I'm trying to use Java's ThreadPoolExecutor class to run a large number of heavy weight tasks with a fixed number of threads. Each of the tasks has many places during which it may fail due to exceptions.

I've subclassed ThreadPoolExecutor and I've overridden the afterExecute method which is supposed to provide any uncaught exceptions encountered while running a task. However, I can't seem to make it work.

For example:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

The output from this program is "Everything's fine--situation normal!" even though the only Runnable submitted to the thread pool throws an exception. Any clue to what's going on here?

Thanks!

Bemoan answered 11/2, 2010 at 22:9 Comment(2)
you never queried the Future of the task, what what happened there. The entire service executor or program is not going to be crashed. The exception is catched and is wrapped under ExecutionException. And will he rethrown if you call future.get(). PS: The future.isDone() [Please read the real api name] will return true, even when the runnable finished erroneously. Because the task is done for real.Coltin
Interestingly, but I see that isDone() might return false for ScheduledThreadPoolExecutor. At the first glance this contradics the method contract, because it must be called when task is executed and future is completed normally or exacptionally. So, calling .get() without .isDone() condition produces infinite lock.Sampler
E
177

From the docs:

Note: When actions are enclosed in tasks (such as FutureTask) either explicitly or via methods such as submit, these task objects catch and maintain computational exceptions, and so they do not cause abrupt termination, and the internal exceptions are not passed to this method.

When you submit a Runnable, it'll get wrapped in a Future.

Your afterExecute should be something like this:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}
Eustace answered 11/2, 2010 at 22:21 Comment(9)
Thanks, I ended up using this solution. Additionally, in case anyone is interested: others have suggested not subclassing the ExecutorService, but I did anyway because I wanted to monitor tasks as they complete rather than waiting for all of them to terminate and then calling get() on all of the returned Futures.Bemoan
Another approach to subclassing the executor is to subclass FutureTask and override its 'done' methodEustace
Tom >> Can you please post your sample snippet code where you subclassed ExecutorService to monitor tasks as they complete...Weatherboarding
This answer won't work if you are using ComplableFuture.runAsync as afterExecute will contain an object that is package private and no way to access the throwable. I got around it by wrapping the call. See my answer below.Channelize
Guys, why do we need executor in this case? If you still do "get" in the main thread and wait till execution complete? I can't catch this.Preconize
So, to not block the calling thread, Is it acceptable solution to add the Futures to a ThreadSafe List and then have a background thread that call future.get on these ?Sawdust
Is it required to catch RuntimeException? catch (RuntimeException e) { t = e; } and how to Combine all the catch with CancellationException, which has the same bodyQuartermaster
Do we have to check if the future is completed using future.isDone()? Since afterExecute is run after the Runnable is completed, I assume future.isDone() always returns true.Thriftless
@Searene: The isDone() check is necessary in certain cases to avoid blocking. See bugs.openjdk.org/browse/JDK-8071638 and bugs.openjdk.org/browse/JDK-7146994 for details. (If I understand correctly, it's not actually necessary when the base class is ThreadPoolExecutor, as in this case. But it doesn't hurt either.)Koonce
W
264

WARNING: It should be noted that this solution will block the calling thread in future.get().


If you want to process exceptions thrown by the task, then it is generally better to use Callable rather than Runnable.

Callable.call() is permitted to throw checked exceptions, and these get propagated back to the calling thread:

Callable task = ...
Future future = executor.submit(task);
// do something else in the meantime, and then...
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

If Callable.call() throws an exception, this will be wrapped in an ExecutionException and thrown by Future.get().

This is likely to be much preferable to subclassing ThreadPoolExecutor. It also gives you the opportunity to re-submit the task if the exception is a recoverable one.

Wardieu answered 11/2, 2010 at 22:15 Comment(13)
> Callable.call() is permitted to throw checked exceptions, and these get propagated back to the calling thread: Note that the thrown exception will propagate to the calling thread only if future.get() or its overloaded version is called.Waive
How does one keep track when a callable is done in this solution? I want to log exceptions when they happen.Cubitiere
It is perfect, but what to do if I run tasks in parallel and do not want to block execution?Love
@GKislin: looks you need to override java.util.concurrent.FutureTask#done and implement custom exception handlingAspect
Don't use this solution, as it breaks the whole purpose of using the ExecutorService. An ExecutorService is an asynchronous execution mechanism which is capable of executing tasks in the background. If you call future.get() right after execute it will block the calling thread until the task is finished.Presswork
but if I use caller.call() on the same thread (I do it optionally), I am forced to catch Exception, and that will prevent me from debugging (eclipse) in the exact spot where it is thrown, I will have only a cause stack, but not access to variables values in the exact spot then on the IDE. see herePalaeontology
This solution should not be so high rated. Future.get() works synchronously and will act as a blocker until the Runnable or Callable have been executed and as stated above defeats the purpose of using the Executor ServiceInwrap
As #nhylated pointed out, this deserves a jdk BUG. If Future.get() is not called, any uncaught exception from Callable is silently ignored. Very bad design.... just spent 1+ day to figure out a library used this and jdk silently ignored exceptions. And, this still exists in jdk12.Coherence
Submit/Callable gets too much undue hate because it is frequently misused. It is for situations where you want to run several time-intensive background operations which return a value or need to ensure completion before continuing, with what's effectively a completion fence at the end of the method from which you are scheduling which invokes .get() from all of these Futures. Yes, it will block but if you need a return value or completion of an operation the probability of a thread block will always be non-zero.Chinaware
Pardon my ignorance here, but as most people here are saying: How is a .run() which is a Reactive action, the same as a return function .call(); which is a Proactive action. They are diametrically opposite. .run() expects a side effect responsibility (DELIVERY) from the machine, while the .call() is a pure return function (RETRIEVAL). This is the difference between event driven vs message driven, so people are required to change the entire architecture to make use of exceptions?Bushore
@Bushore The terms "Reactive" and "Proactive" are not common in the Java world. Java is neither a functional nor a "reactive" language. It's an object-oriented and imperative language. It doesn't restrict how one can use Runnable and Callable, and implementations of these interfaces are indeed used for similar things in Java. Callable returns a value, but that doesn't mean it can't have side effects.Koonce
Thanks for your answer @jcsahnwaldtReinstateMonica, you are correct they are not, and I believe part of the reason why is because the pattern became a thing so much later during the lifespan of Java, that it is seldom used. Imho, both terms should be used a lot more...., ever since the usage of lambdas, lifecycles became an integral part of any system that uses them. The lack of understanding of them and the principles, have helped in leading to their misuse (memory leaks) in the observer pattern, and have played a big part on the hard ...Bushore
(lack of widespread) adoption of reactivity as a tool for programming. When you mix OOP with lifecycles, simple terms like "getters" and "setter" are not enough to convey what the system is capable of. Both terms "pro/re-active" are even more abstract and effectively categorize both actions "get", "set" + "accept(as producer)" and "consume(as consumer)" Both "reactive" and "proactive" have been part of my lexicon and have helped me a lot to understand the systems that I develop myself.Bushore
E
177

From the docs:

Note: When actions are enclosed in tasks (such as FutureTask) either explicitly or via methods such as submit, these task objects catch and maintain computational exceptions, and so they do not cause abrupt termination, and the internal exceptions are not passed to this method.

When you submit a Runnable, it'll get wrapped in a Future.

Your afterExecute should be something like this:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}
Eustace answered 11/2, 2010 at 22:21 Comment(9)
Thanks, I ended up using this solution. Additionally, in case anyone is interested: others have suggested not subclassing the ExecutorService, but I did anyway because I wanted to monitor tasks as they complete rather than waiting for all of them to terminate and then calling get() on all of the returned Futures.Bemoan
Another approach to subclassing the executor is to subclass FutureTask and override its 'done' methodEustace
Tom >> Can you please post your sample snippet code where you subclassed ExecutorService to monitor tasks as they complete...Weatherboarding
This answer won't work if you are using ComplableFuture.runAsync as afterExecute will contain an object that is package private and no way to access the throwable. I got around it by wrapping the call. See my answer below.Channelize
Guys, why do we need executor in this case? If you still do "get" in the main thread and wait till execution complete? I can't catch this.Preconize
So, to not block the calling thread, Is it acceptable solution to add the Futures to a ThreadSafe List and then have a background thread that call future.get on these ?Sawdust
Is it required to catch RuntimeException? catch (RuntimeException e) { t = e; } and how to Combine all the catch with CancellationException, which has the same bodyQuartermaster
Do we have to check if the future is completed using future.isDone()? Since afterExecute is run after the Runnable is completed, I assume future.isDone() always returns true.Thriftless
@Searene: The isDone() check is necessary in certain cases to avoid blocking. See bugs.openjdk.org/browse/JDK-8071638 and bugs.openjdk.org/browse/JDK-7146994 for details. (If I understand correctly, it's not actually necessary when the base class is ThreadPoolExecutor, as in this case. But it doesn't hurt either.)Koonce
B
20

The explanation for this behavior is right in the javadoc for afterExecute:

Note: When actions are enclosed in tasks (such as FutureTask) either explicitly or via methods such as submit, these task objects catch and maintain computational exceptions, and so they do not cause abrupt termination, and the internal exceptions are not passed to this method.

Bandeau answered 11/2, 2010 at 22:21 Comment(0)
C
17

I got around it by wrapping the supplied runnable submitted to the executor.

CompletableFuture.runAsync(() -> {
        try {
              runnable.run();
        } catch (Throwable e) {
              Log.info(Concurrency.class, "runAsync", e);
        }
}, executorService);
Channelize answered 14/12, 2014 at 11:14 Comment(2)
You can improve readability by using whenComplete() method of CompletableFuture.Marguritemargy
@EduardWirch this works but you cannot throw back an exception from the whenComplete()Lockard
R
5

I'm using VerboseRunnable class from jcabi-log, which swallows all exceptions and logs them. Very convenient, for example:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);
Rev answered 10/5, 2012 at 15:49 Comment(0)
A
5

Another solution would be to use the ManagedTask and ManagedTaskListener.

You need a Callable or Runnable which implements the interface ManagedTask.

The method getManagedTaskListener returns the instance you want.

public ManagedTaskListener getManagedTaskListener() {

And you implement in ManagedTaskListener the taskDone method:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

More details about managed task lifecycle and listener.

Adytum answered 11/11, 2015 at 8:17 Comment(0)
C
3

This works

  • It is derived from SingleThreadExecutor, but you can adapt it easily
  • Java 8 lamdas code, but easy to fix

It will create a Executor with a single thread, that can get a lot of tasks; and will wait for the current one to end execution to begin with the next

In case of uncaugth error or exception the uncaughtExceptionHandler will catch it

public final class SingleThreadExecutorWithExceptions {

    public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

        ThreadFactory factory = (Runnable runnable)  -> {
            final Thread newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions");
            newThread.setUncaughtExceptionHandler( (final Thread caugthThread,final Throwable throwable) -> {
                uncaughtExceptionHandler.uncaughtException(caugthThread, throwable);
            });
            return newThread;
        };
        return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue(),
                        factory){


                    protected void afterExecute(Runnable runnable, Throwable throwable) {
                        super.afterExecute(runnable, throwable);
                        if (throwable == null && runnable instanceof Future) {
                            try {
                                Future future = (Future) runnable;
                                if (future.isDone()) {
                                    future.get();
                                }
                            } catch (CancellationException ce) {
                                throwable = ce;
                            } catch (ExecutionException ee) {
                                throwable = ee.getCause();
                            } catch (InterruptedException ie) {
                                Thread.currentThread().interrupt(); // ignore/reset
                            }
                        }
                        if (throwable != null) {
                            uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable);
                        }
                    }
                });
    }



    private static class FinalizableDelegatedExecutorService
            extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

    /**
     * A wrapper class that exposes only the ExecutorService methods
     * of an ExecutorService implementation.
     */
    private static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future submit(Runnable task) {
            return e.submit(task);
        }
        public  Future submit(Callable task) {
            return e.submit(task);
        }
        public  Future submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public  List> invokeAll(Collection> tasks)
                throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public  List> invokeAll(Collection> tasks,
                                             long timeout, TimeUnit unit)
                throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public  T invokeAny(Collection> tasks)
                throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public  T invokeAny(Collection> tasks,
                               long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }



    private SingleThreadExecutorWithExceptions() {}
}
Cinthiacintron answered 16/5, 2017 at 16:41 Comment(1)
Using finalize is a bit unstable unfortunately, since it'll only be called "later when the garbage collector collects it" (or perhaps not in the case of a Thread, dunno)...Schistosome
K
1

This is because of AbstractExecutorService :: submit is wrapping your runnable into RunnableFuture (nothing but FutureTask) like below

AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

Then execute will pass it to Worker and Worker.run() will call the below.

ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Finally task.run(); in the above code call will call FutureTask.run(). Here is the exception handler code, because of this you are NOT getting the expected exception.

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
Karolinekaroly answered 12/5, 2016 at 8:32 Comment(1)
I guess this is done to force the programmer to intercept exceptions and handle them more granularly(??)Bushore
N
1

If you want to monitor the execution of task, you could spin 1 or 2 threads (maybe more depending on the load) and use them to take tasks from an ExecutionCompletionService wrapper.

Noheminoil answered 31/7, 2016 at 13:23 Comment(0)
P
1

This is similar to mmm's solution, but a bit more understandable. Have your tasks extend an abstract class that wraps the run() method.

public abstract Task implements Runnable {

    public abstract void execute();

    public void run() {
      try {
        execute();
      } catch (Throwable t) {
        // handle it  
      }
    }
}


public MySampleTask extends Task {
    public void execute() {
        // heavy, error-prone code here
    }
}
Prewitt answered 29/3, 2020 at 1:24 Comment(0)
B
1

The doc's example wasn't giving me the results I wanted.

When a Thread process was abandoned (with explicit interput();s) Exceptions were appearing.

Also I wanted to keep the "System.exit" functionality that a normal main thread has with a typical throw, I wanted this so that the programmer was not forced to work on the code having to worry on it's context (... a thread), If any error appears, it must either be a programming error, or the case must be solved in place with a manual catch... no need for overcomplexities really.

So I changed the code to match my needs.

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
        super.afterExecute(r, t); 
        if (t == null && r instanceof Future<?>) { 
            Future<?> future = (Future<?>) r; 
            boolean terminate = false; 
                try { 
                    future.get(); 
                } catch (ExecutionException e) { 
                    terminate = true; 
                    e.printStackTrace(); 
                } catch (InterruptedException | CancellationException ie) {// ignore/reset 
                    Thread.currentThread().interrupt(); 
                } finally { 
                    if (terminate) System.exit(0); 
                } 
        } 
    }

Be cautious though, this code basically transforms your threads into a main thread Exception-wise, while keeping all it's parallel properties... But let's be real, designing architectures in function of the system's parallel mechanism (extends Thread) is the wrong approach IMHO... unless an event driven design is strictly required....but then... if that is the requirement the question is: Is the ExecutorService even needed in this case?... maybe not.

Bushore answered 21/7, 2022 at 18:51 Comment(0)
M
0

If your ExecutorService comes from an external source (i. e. it's not possible to subclass ThreadPoolExecutor and override afterExecute()), you can use a dynamic proxy to achieve the desired behavior:

public static ExecutorService errorAware(final ExecutorService executor) {
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
            new Class[] {ExecutorService.class},
            (proxy, method, args) -> {
                if (method.getName().equals("submit")) {
                    final Object arg0 = args[0];
                    if (arg0 instanceof Runnable) {
                        args[0] = new Runnable() {
                            @Override
                            public void run() {
                                final Runnable task = (Runnable) arg0;
                                try {
                                    task.run();
                                    if (task instanceof Future<?>) {
                                        final Future<?> future = (Future<?>) task;

                                        if (future.isDone()) {
                                            try {
                                                future.get();
                                            } catch (final CancellationException ce) {
                                                // Your error-handling code here
                                                ce.printStackTrace();
                                            } catch (final ExecutionException ee) {
                                                // Your error-handling code here
                                                ee.getCause().printStackTrace();
                                            } catch (final InterruptedException ie) {
                                                Thread.currentThread().interrupt();
                                            }
                                        }
                                    }
                                } catch (final RuntimeException re) {
                                    // Your error-handling code here
                                    re.printStackTrace();
                                    throw re;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    } else if (arg0 instanceof Callable<?>) {
                        args[0] = new Callable<Object>() {
                            @Override
                            public Object call() throws Exception {
                                final Callable<?> task = (Callable<?>) arg0;
                                try {
                                    return task.call();
                                } catch (final Exception e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    }
                }
                return method.invoke(executor, args);
            });
}
Mora answered 20/10, 2015 at 14:51 Comment(0)
W
-5

Instead of subclassing ThreadPoolExecutor, I would provide it with a ThreadFactory instance that creates new Threads and provides them with an UncaughtExceptionHandler

Wiltz answered 11/2, 2010 at 22:19 Comment(3)
I tried this as well, but the uncaughtException method never seems to get called. I believe this is because a worker thread in the ThreadPoolExecutor class is catching the exceptions.Bemoan
The uncaughtException method is not called because the ExecutorService's submit method is wrapping the Callable/Runnable in a Future; the exception is being captured there.Bodrogi
it should work if you use execute(): void, instead of submit():Future, though.Noheminoil

© 2022 - 2024 — McMap. All rights reserved.