What is terminating my Java ExecutorService
Asked Answered
M

2

8

I originally saw this issue with a more complex subclass of ThreadPoolExecutor, but I have simplified so now contains not much more than some additional debugging, and still get the same problem.

import com.jthink.songkong.cmdline.SongKong;
import com.jthink.songkong.ui.MainWindow;
import com.jthink.songkong.util.SongKongThreadFactory;

import java.util.concurrent.*;
import java.util.logging.Level;



public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor
{
    /**
     * Uses the default CallerRunsPolicy when queue is full
     *  @param workerSize
     * @param threadFactory
     * @param queue
     */
    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, LinkedBlockingQueue<Runnable> queue)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory, new CallerRunsPolicy());
    }

    /**
     * Allow caller to specify the RejectedExecutionPolicy
     *  @param workerSize
     * @param threadFactory
     * @param queue
     * @param reh
     */
    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, LinkedBlockingQueue<Runnable> queue, RejectedExecutionHandler reh)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory, reh);
    }

    @Override
    public <T> FutureCallable<T> newTaskFor(Callable<T> callable) {
        return new FutureCallable<T>(callable);
    }

    /**
     * Check not been paused
     *
     * @param t
     * @param r
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        SongKong.checkIn();
    }

    /**
     * After execution
     *
     * @param r
     * @param t
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t)
    {
        super.afterExecute(r, t);

        if (t == null && r instanceof Future<?>)
        {
            try
            {
              Object result = ((Future<?>) r).get();
            }
            catch (CancellationException ce)
            {
                t = ce;
            }
            catch (ExecutionException ee)
            {
                t = ee.getCause();
            }
            catch (InterruptedException ie)
            {
                Thread.currentThread().interrupt(); // ignore/reset
            }
        }
        if (t != null)
        {
            MainWindow.logger.log(Level.SEVERE, "AFTER EXECUTE---" + t.getMessage(), t);
        }
    }

    @Override
    protected void terminated()
    {
        //All tasks have completed either naturally or via being cancelled by timeout task so close the timeout task
        MainWindow.logger.severe("---Terminated:"+((SongKongThreadFactory)getThreadFactory()).getName());
        MainWindow.userInfoLogger.severe("---Terminated:"+((SongKongThreadFactory)getThreadFactory()).getName());
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.logger.log(Level.SEVERE, ste.toString());
        }
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.userInfoLogger.log(Level.SEVERE, ste.toString());
        }
    }

    @Override
    public void shutdown()
    {
        MainWindow.logger.severe("---Shutdown:"+((SongKongThreadFactory)getThreadFactory()).getName());
        MainWindow.userInfoLogger.severe("---Shutdown:"+((SongKongThreadFactory)getThreadFactory()).getName());
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.logger.log(Level.SEVERE, ste.toString());
        }
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.userInfoLogger.log(Level.SEVERE, ste.toString());
        }
        super.shutdown();
    }
}

This ExecutorService is being used by the following class, that allow instance to asynchronously submit tasks, the ExecutorService should not be shutdown until all submitted tasks have completed.

package com.jthink.songkong.analyse.analyser;

import com.jthink.songkong.preferences.GeneralPreferences;
import com.jthink.songkong.ui.MainWindow;
import com.jthink.songkong.util.SongKongThreadFactory;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

/**
 *  Sets a timeout of each task submitted and cancel them if take longer than the timeout
 *
 *  The timeout is set to 30 minutes, we only want to call if really broken, it should not happen under usual circumstances
 */
public class MainAnalyserService extends AnalyserService
{
    //For monitoring/controlling when finished
    private final AtomicInteger pendingItems = new AtomicInteger(0);
    private final CountDownLatch latch = new CountDownLatch(1);

    //If task has not completed 30 minutes after it started (added to queue) then it should be cancelled
    private static final int TIMEOUT_PER_TASK = 30;

    private static MainAnalyserService mas;

    public static MainAnalyserService getInstanceOf()
    {
        return mas;
    }

    public static MainAnalyserService create(String threadGroup)
    {
        mas = new MainAnalyserService(threadGroup);
        return mas;
    }

    public MainAnalyserService(String threadGroup)
    {
        super(threadGroup);
        initExecutorService();
    }

    /**
    Configure thread to match cpus but even if single cpu ensure have at least two threads to protect against
    scenario where there is only cpu and that thread is waiting on i/o rather than being cpu bound this would allow
    other thread to do something.
     */
    @Override
    protected void initExecutorService()
    {
        int workerSize = GeneralPreferences.getInstance().getWorkers();
        if(workerSize==0)
        {
            workerSize = Runtime.getRuntime().availableProcessors();
        }
        //Even if only have single cpu we still have multithread so we dont just have single thread waiting on I/O
        if(workerSize< MIN_NUMBER_OF_WORKER_THREADS)
        {
            workerSize = MIN_NUMBER_OF_WORKER_THREADS;
        }
        MainWindow.userInfoLogger.severe("Workers Configuration:"+ workerSize);
        MainWindow.logger.severe("Workers Configuration:"+ workerSize);

        executorService = new TimeoutThreadPoolExecutor(workerSize,
                new SongKongThreadFactory(threadGroup),
                new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),
                TIMEOUT_PER_TASK,
                TimeUnit.MINUTES,
                new EnsureIncreaseCountIfRunOnCallingThread());
    }

    public AtomicInteger getPendingItems()
    {
        return pendingItems;
    }

    /**
     * If queue is full this gets called and we log that we run task on local calling thread.
     */
    class EnsureIncreaseCountIfRunOnCallingThread implements RejectedExecutionHandler
    {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public EnsureIncreaseCountIfRunOnCallingThread() { }

        /**
         * Executes task on calling thread, ensuring we increment count
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown())
            {
                try
                {
                    MainWindow.userInfoLogger.severe(">>SubmittedLocally:" + ((FutureCallable) r).getCallable().getClass().getName() + ":" + pendingItems.get());
                    r.run();
                    MainWindow.userInfoLogger.severe(">>CompletedLocally:" + ((FutureCallable) r).getCallable().getClass().getName() + ":" +  pendingItems.get());
                }
                catch(Exception ex)
                {
                    MainWindow.userInfoLogger.log(Level.SEVERE, ex.getMessage(), ex);
                }
            }
        }
    }

    /**
     * Increase count and then Submit to ExecutorService
     *
     * @param callingTask
     * @param task
     */
    public void submit(Callable<Boolean> callingTask, Callable<Boolean> task) //throws Exception
    {
        //Ensure we increment before calling submit in case rejectionExecution comes into play
        int remainingItems = pendingItems.incrementAndGet();
        executorService.submit(task);
        MainWindow.userInfoLogger.severe(">>Submitted:" + task.getClass().getName() + ":" + remainingItems);
    }

    public ExecutorService getExecutorService()
    {
        return executorService;
    }

    /**
     * Must be called by Callable when it has finished work (or if error)
     *
     * @param task
     */
    public void workDone(Callable task)
    {
        int remainingItems = pendingItems.decrementAndGet();
        MainWindow.userInfoLogger.severe(">>WorkDone:" + task.getClass().getName() + ":" +remainingItems);
        if (remainingItems == 0)
        {
            MainWindow.userInfoLogger.severe(">Closing Latch:");
            latch.countDown();
        }
    }

    /**
     * Wait for latch to close, this should occur once all submitted aysync tasks have finished in some way
     *
     * @throws InterruptedException
     */
    public void awaitCompletion() throws InterruptedException{
        latch.await();
    }
}

The calling Class has

//Just waits for all the async tasks on the list to complete/fail
analyserService.awaitCompletion();
MainWindow.userInfoLogger.severe(">MainAnalyser Completed");

For one customer the terminated() method was getting called even though there are still task that have not completed, and the executorservice has only been running for 8 minutes, and no tasks have timed out. I have also seen the problem locally

Debugging shows

UserLog

05/07/2019 11.29.38:EDT:SEVERE: ----G14922:The Civil War:8907617:American Songs of Revolutionary Times and the Civil War Era:NoScore
05/07/2019 11.29.38:EDT:SEVERE: >>Submitted:com.jthink.songkong.analyse.analyser.SongSaver:69
05/07/2019 11.29.38:EDT:SEVERE: >>WorkDone:com.jthink.songkong.analyse.analyser.DiscogsSongGroupMatcher:68
05/07/2019 11.29.38:EDT:SEVERE: >MainAnalyser Finished
05/07/2019 11.29.38:EDT:INFO: Stop

DebugLog

   05/07/2019 11.29.38:EDT:TimeoutThreadPoolExecutor:terminated:SEVERE: ---Terminated:Worker

So we can see there are still 68 tasks to complete, and MainAnalyser has not closed the latch, yet threadpool executor has terminated

I overridden shutdown() to see if that is called and it is not,

terminate() is being called by runWorker(), runWorker() should continue in loop until queue is empty which it is not, but something seems to cause it to leave loop and the processWorkerExit() after doing some more checks eventually terminates the whole Executor (not just a worker thread)

10/07/2019 07.11.51:BST:MainAnalyserService:submit:SEVERE: >>Submitted:com.jthink.songkong.analyse.analyser.DiscogsSongGroupMatcher:809
10/07/2019 07.11.51:BST:MainAnalyserService:workDone:SEVERE: >>WorkDone:com.jthink.songkong.analyse.analyser.MusicBrainzSongGroupMatcher2:808
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: ---Terminated:Worker
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.lang.Thread.getStackTrace(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: com.jthink.songkong.analyse.analyser.TimeoutThreadPoolExecutor.terminated(TimeoutThreadPoolExecutor.java:118)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.tryTerminate(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.processWorkerExit(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.lang.Thread.run(Unknown Source)

Because ThreadPoolExecutor is part of Standard Java I cannot (easily) set breakpoints to try and find out what it is doing, this is ThreadPoolExecutor code (standard Jave not my code)

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

We experimented with the queue size in the Executor, by default it was 100 because I did not want it to get too large as the queue tasks will use more memory and I would rather the calling tasks just runs itself if queue is busy. But in an attempt solve the issue (and remove need for CallerRunPolicy to be called because queue full) I increased queue size to 1000 and this caused the error to occur more quickly and then removed the limit completely and continue to fail more rapidly

 new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),

I was looking at an alternative to ThreadExecutorPool and came across ForkJoinPool - https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

One thing I noticed is that ForkJoinPool has different methods for submitting tasks from within a task submitted to ForkJoinPool compared to submitting form outside. I dont why this is, but wondering if because I am submitting tasks from within tasks being run by Executor whther this could cause issue in some way ?

I have now managed to create own version of ThreadPoolExecutor by simply copying/pasting code into new Class, renaming, and also having to create a version of RejectedExcecutionhandler that expects my class rather than ThreadPoolExecutor and got this running.

Started to add some debugging to see if I can decipher what is going on, any ideas ?

Befotre call to processWorkerExit I added

 MainWindow.userInfoLogger.severe("-----------------------"+getTaskCount()
                    +":"+getActiveCount()
                    +":"+w.completedTasks
                    +":"+ completedAbruptly);

and got on failure

-----------------------3686:0:593:false
Meatball answered 8/7, 2019 at 14:27 Comment(9)
Quite some code and info. Before digging deeper into this: Do you think it's possible to wrap this up into a minimal reproducible example ?Malkamalkah
@Malkamalkah not really, it doesnt happen consistently in the same place, its actually taken me alot of work just to get this far.Meatball
Sure, it's complicated (and even more due to some final methods in these classes that could be helpful to override for debugging :-/ ). According to the last stack trace, where processWorkerExit is called, I'd be curious to see whether afterExecute is called (and with what parameters). But without having the code, one can hardly do more than such guesses....Malkamalkah
@Malkamalkah Okay debugging showed me the issue, basically there is another ES that initially loads the tasks onto this ES, there is an error somewhere in my code thats mean the the latch is closed to soon because taskcount =0 when it should not be. Then when first ES finishes the awaitCompletion() works immediately and shutdown is called, even though tasks on the Es are still submitting tasks. Havent quite worked out the issue, but at least using StackOverflow to formulate the problem has led to me finding the issue. I will probably close this questionMeatball
Might be a case of en.wikipedia.org/wiki/Rubber_duck_debugging . In any case, fleshing out the core of an issue can lead to further insights (merely by noticing that it is harder than it should be to flesh it out. I wonder what the "big picture" is in your case (i.e. what should actually be accomplished with these structures). Executors have quite some functionality that allows tweaking their behavior for different use-cases...Malkamalkah
@Malkamalkah this might help - #56617583Meatball
@Malkamalkah found the problem but not the solution, ES1 is loading folders and submitting folders to ES2 to process. Its possible for ES1 to load a few folders and for for ES2 to finish process them all before ES1 has submitted next folder, so pendingItems temporarily goes to zero allowing latch to close. Main code is waiting for ES1 to finish so when it does Es.awaitCompletion() returns immediately so calls shutdown() even though there are now pending items and these would submit new tasks that now wont get run because submitted after shutdown() was called.Meatball
I'd have to allocate far more time to look through the other question, its answers and comments. But it now sounds as if you are still using multiple ESs, despite the recommendation in the other answer to use only one. (I thought that Flow Based Programming might be an alternative, but am not sure).Malkamalkah
@Malkamalkah I'm just using two with one being used with a single thread to load the other, that does the rest of the work. Siimply because if I loaded the the executor from the main thread I would have to preload with all folders. I think I now have a solution to this problem and will paste an answer once checked.Meatball
M
2

For a long time I thought the problem must be with my code, I then started thinking the issue was with ThreadPoolExecutor, but adding debugging to my own version of runWorker() showed the problem was indeed my own code.

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                MainWindow.userInfoLogger.severe("-----------------------"+workQueue.size());

From this I could see that whilst the worker queue was getting generally longer and matched the value of

MainThreadAnalyzer.pendingItems -noOfWorkerThreads

at a particular point the two values diverged, and this was when the SongLoader process (which mistakenly I had not really considered) finished. So MainThreadAnalyzer was continuing to submit work increasing the value of pendingItems , but the work queue size of the Executor was getting smaller.

This lead to realization that the Executor had shutdown() alot earlier, but we hadn't realized this because only check latch after songloader had closed.

And the reason it had shutdown was because early on the MainAnalyzerThread was completing the work more quickly then SongLoader was submitting it so the value of pendingItems was temporarily set to zero allowing the latch to be closed.

The solution is as follows

Add a boolean flag to indicate when songLoader has completed and only allow latch to be closed once this flag is set.

private boolean songLoaderCompleted = false;
public void workDone(Callable task)
    {
        int remainingItems = pendingItems.decrementAndGet();
        MainWindow.logger.severe(">>WorkDone:" + task.getClass().getName() + ":" +remainingItems);

        if (remainingItems == 0 && songLoaderCompleted)
        {
            MainWindow.logger.severe(">Closing Latch:");
            latch.countDown();
        }
    }

Then in main thread set this flag once SongLoader has completed

 //Start SongLoader
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);

//SongLoader uses CompletionService when calls LoadFolderWorkers so shutdown wont return until all folder
//submissions completed to the MainAnalyserService
songLoaderService.shutdown();
songLoaderService.awaitTermination(10, TimeUnit.DAYS);
MainWindow.userInfoLogger.severe(">Song Loader Finished");

//Were now allowed to consider closing the latch because we know all songs have now been loaded
//so no false chance of zeroes
analyserService.setSongLoaderCompleted();

//Just waits for all the async tasks on the list to complete/fail
analyserService.awaitCompletion();
MainWindow.userInfoLogger.severe(">MainAnalyser Completed");

//This should be immediate as there should be no tasks still remaining
analyserService.getExecutorService().shutdown();
analyserService.getExecutorService().awaitTermination(10, TimeUnit.DAYS);
Meatball answered 11/7, 2019 at 7:3 Comment(11)
Much of "fixes" you are providing here would not be required if you would wrap your unit of work into callable and submit it to executor instead of handling job scheduling yourself.Buford
@Buford I do submit the work to the Executor, but the issue is that unit of work can itself asynchronously submit more work, so I need a way of knowing when all the work is doneMeatball
CountDownl latch is missued here as well. Use it to keep track of completion directy. So new CountDownLatch(NumberOfTasks) and countDown upon completion.Buford
@Buford I dont know number of tasks in advance, thats the whole point.Meatball
Sure, stil you can submit unit of work directly to executor instead of polling from external queue yourselfBuford
Also to get info on completion tasks I would go for collection of FutureBuford
@Buford where do I do polling ?Meatball
In your runWorker - by the snippet I can conclude that worker takes task from queue and execute them, while you should sumbit tasks directly to executor service.Buford
@Buford runWorker is not my code, that is part of the standard Java class ThreadPoolExecutor !Meatball
Is it? That does that mean but adding debugging to my own version of runWorker() showed the problem was indeed my own code.Buford
I just created a copy of ThreadPoolExecutor purely so I could add some debugging because I couldnt understand why my thread pool was terminating prematurelyMeatball
B
1

You are just misusing ExecutorService.

What you are doing (even in your "solution") is this

  • Submit tasks
  • Wait for them to finish
  • Shutdown
  • Wait again for shutdown to happen (why is that actually?)

What you should do is:

  • Submit tasks
  • Shutdown the executor to not allow any new tasks
  • Await termination - this will block until all tasks are finished, or timeout is reached

You should check for return status of awaitTermination because

  • If true - all tasks are finished before given timeout
  • If false - not all tasks are finished yet - and probably you should not start your second pool in such case.

Also there are 2 options how to use thread executor. You can spawn worker threads and let them decide what they supposed to do - like you did by looping in worker thread for new tasks

Or (which I prefer), wrap whatever is your job supposed to do into separate task (most probably what you have in loop body) and submit as separate task to pool. ExecutorService will do the scheduling for you.

Buford answered 11/7, 2019 at 7:9 Comment(1)
Sorry but you dont actually understands what Im doing here, this answer is inocrrectMeatball

© 2022 - 2024 — McMap. All rights reserved.