ThreadPoolExecutor Block When its Queue Is Full?
Asked Answered
S

11

80

I am trying to execute lots of tasks using a ThreadPoolExecutor. Below is a hypothetical example:

def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
    threadPoolExecutor.execute(runnable)

The problem is that I quickly get a java.util.concurrent.RejectedExecutionException since the number of tasks exceeds the size of the work queue. However, the desired behavior I am looking for is to have the main thread block until there is room in the queue. What is the best way to accomplish this?

Surra answered 10/8, 2010 at 4:24 Comment(5)
Take a look at this question: #2001586Drinkwater
This answer to another question suggests using a custom BlockingQueue subclass which blocks on offer() by delegating to put(). I think that ends up working more or less the same as the RejectedExecutionHandler which calls getQueue().put().Madcap
Putting directly in queue would be incorrect, as explained in this answer https://mcmap.net/q/195637/-threadpoolexecutor-block-when-its-queue-is-fullFisticuffs
@SumitJain Read that answer more carefully; only one of the three objections raised in that answer apply to the approach suggested in @Robert Tupelo-Schneck's comment. By invoking put() from within the queue itself, you don't access the queue via getQueue() (objection #3) and the object you're putting is already properly wrapped if necessary (objection #2). You're still at risk of deadlock if all your threads die before the item comes off the queue, but that may be a risk most people looking for this particular solution would be willing to assume.Abbottson
Possible duplicate of How to make ThreadPoolExecutor's submit() method block if it is saturated?Rentsch
T
82

In some very narrow circumstances, you can implement a java.util.concurrent.RejectedExecutionHandler that does what you need.

RejectedExecutionHandler block = new RejectedExecutionHandler() {
  rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     executor.getQueue().put( r );
  }
};

ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);

Now. This is a very bad idea for the following reasons

  • It's prone to deadlock because all the threads in the pool may die before the thing you put in the queue is visible. Mitigate this by setting a reasonable keep alive time.
  • The task is not wrapped the way your Executor may expect. Lots of executor implementations wrap their tasks in some sort of tracking object before execution. Look at the source of yours.
  • Adding via getQueue() is strongly discouraged by the API, and may be prohibited at some point.

A almost-always-better strategy is to install ThreadPoolExecutor.CallerRunsPolicy which will throttle your app by running the task on the thread which is calling execute().

However, sometimes a blocking strategy, with all its inherent risks, is really what you want. I'd say under these conditions

  • You only have one thread calling execute()
  • You have to (or want to) have a very small queue length
  • You absolutely need to limit the number of threads running this work (usually for external reasons), and a caller-runs strategy would break that.
  • Your tasks are of unpredictable size, so caller-runs could introduce starvation if the pool was momentarily busy with 4 short tasks and your one thread calling execute got stuck with a big one.

So, as I say. It's rarely needed and can be dangerous, but there you go.

Good Luck.

Talton answered 19/8, 2010 at 3:47 Comment(4)
A very well thought-out response. I take minor issue with your condition that > "You have to (or want to) have a very small queue length." You may not be able to predict how many tasks that a given job will queue. Maybe you're running a daily job that processes data from some DB and on Monday there's 500 records to process but Tuesday there are 50,000. You've gotta set an upper bound on your queue such that you won't blow your heap when a big job comes through. In that case there's no harm in waiting for some tasks to complete before queueing more.Bezonian
"It's prone to deadlock because all the threads in the pool may die before the thing you put in the queue is visible. Mitigate this by setting a reasonable keep alive time." Can't deadlock be avoided completely by setting the min pool size to a value greater than zero? Every other reason is fallout of Java not having built-in support for blocking puts to executor queues. Which is interesting, because it seems to be a pretty reasonable strategy. I wonder what the rationale is.Sodium
Perhaps another condition for a blocking strategy is when order of execution is important. CallerRunsPolicy will mean that the rejected task will likely be executed before other pending items in the executor.Mcswain
@TimPote the current implementation of execute() as of java 8 takes care of that condition also. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. Darren, do you see any issues with this approach with java 8 also ?Favianus
H
20

What you need to do is to wrap your ThreadPoolExecutor into Executor which explicitly limits amount of concurrently executed operations inside it:

 private static class BlockingExecutor implements Executor {

    final Semaphore semaphore;
    final Executor delegate;

    private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
        semaphore = new Semaphore(concurrentTasksLimit);
        this.delegate = delegate;
    }

    @Override
    public void execute(final Runnable command) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }

        final Runnable wrapped = () -> {
            try {
                command.run();
            } finally {
                semaphore.release();
            }
        };

        delegate.execute(wrapped);

    }
}

You can adjust concurrentTasksLimit to the threadPoolSize + queueSize of your delegate executor and it will pretty much solve your problem

Hallam answered 16/5, 2018 at 1:42 Comment(2)
Nice and smooth. Thank you!Biotin
from semaphore.release() executed to release the thread has a duration. in this case, ThreadPoolExecutor will execute rejecthandler: threadpool semaphore.release-->main thread semaphore.acquire-->main thread delegate.execute-->threadpool release thread to another runnable.Oleander
T
9

You could use a semaphore to block threads from going into the pool.

ExecutorService service = new ThreadPoolExecutor(
    3, 
    3, 
    1, 
    TimeUnit.HOURS, 
    new ArrayBlockingQueue<>(6, false)
);

Semaphore lock = new Semaphore(6); // equal to queue capacity

for (int i = 0; i < 100000; i++ ) {
    try {
        lock.acquire();
        service.submit(() -> {
            try {
              task.run();
            } finally {
              lock.release();
            }
        });
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

Some gotchas:

  • Only use this pattern with a fixed thread pool. The queue is unlikely to be full often, thus new threads won't be created. Check out the java docs on ThreadPoolExecutor for more details: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html There is a way around this, but it is out of scope of this answer.
  • Queue size should be higher than the number of core threads. If we were to make the queue size 3, what would end up happening is:

    • T0: all three threads are doing work, the queue is empty, no permits are available.
    • T1: Thread 1 finishes, releases a permit.
    • T2: Thread 1 polls the queue for new work, finds none, and waits.
    • T3: Main thread submits work into the pool, thread 1 starts work.

    The example above translates to thread the main thread blocking thread 1. It may seem like a small period, but now multiply the frequency by days and months. All of a sudden, short periods of time add up to a large amount of time wasted.

Truant answered 26/3, 2018 at 0:28 Comment(2)
Thread 1 is already blocked at time T2 when it finds the queue to be empty. I'm not sure I understood your point on main thread blocking that thread.Serg
@Serg "Thread 1 is already blocked at time T2 when it finds the queue to be empty." Right, and because it's the main thread's responsibility to put work into the queue, you can deduce that the main thread is blocking Thread 1.Truant
J
8

This is what I ended up doing:

int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();

for (int i = 0; i < 100000; i++) {
    try {
        lock.acquire();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    pool.execute(() -> {
        try {
            // Task logic
        } finally {
            lock.release();
        }
    });
}
Jordanna answered 29/5, 2018 at 21:4 Comment(0)
N
2

A fairly straightforward option is to wrap your BlockingQueue with an implementation that calls put(..) when offer(..) is being invoked:

public class BlockOnOfferAdapter<T> implements BlockingQueue<T> {

(..)

  public boolean offer(E o) {
        try {
            delegate.put(o);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
        return true;
  }

(.. implement all other methods simply by delegating ..)

}

This works because by default put(..) waits until there is capacity in the queue when it is full, see:

    /**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;

No catching of RejectedExecutionException or complicated locking necessary.

Neolamarckism answered 5/11, 2019 at 9:34 Comment(1)
This depends on implementation detail of how ThreadPoolExecutor is implemented and is not guaranteed by jdk spec.Yggdrasil
O
1

Here is my code snippet in this case:

public void executeBlocking( Runnable command ) {
    if ( threadPool == null ) {
        logger.error( "Thread pool '{}' not initialized.", threadPoolName );
        return;
    }
    ThreadPool threadPoolMonitor = this;
    boolean accepted = false;
    do {
        try {
            threadPool.execute( new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    }
                    // to make sure that the monitor is freed on exit
                    finally {
                        // Notify all the threads waiting for the resource, if any.
                        synchronized ( threadPoolMonitor ) {
                            threadPoolMonitor.notifyAll();
                        }
                    }
                }
            } );
            accepted = true;
        }
        catch ( RejectedExecutionException e ) {
            // Thread pool is full
            try {
                // Block until one of the threads finishes its job and exits.
                synchronized ( threadPoolMonitor ) {
                    threadPoolMonitor.wait();
                }
            }
            catch ( InterruptedException ignored ) {
                // return immediately
                break;
            }
        }
    } while ( !accepted );
}

threadPool is a local instance of java.util.concurrent.ExecutorService which has been initialized already.

Oversubtlety answered 31/5, 2017 at 8:11 Comment(0)
T
0

I solved this problem using a custom RejectedExecutionHandler, which simply blocks the calling thread for a little while and then tries to submit the task again:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

This class can just be used in the thread-pool executor as a RejectedExecutionHandler like any other. In this example:

executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())

The only downside I see is that the calling thread might get locked slightly longer than strictly necessary (up to 250ms). For many short-running tasks, perhaps decrease the wait-time to 10ms or so. Moreover, since this executor is effectively being called recursively, very long waits for a thread to become available (hours) might result in a stack overflow.

Nevertheless, I personally like this method. It's compact, easy to understand, and works well. Am I missing anything important?

Thurible answered 6/11, 2017 at 22:8 Comment(0)
H
0

Ok, old thread but this is what I found when searching for blocking thread executor. My code tries to get a semaphore when the task is submitted to the task queue. This blocks if there are no semaphores left. As soon as a task is done the semaphore is released with the decorator. Scary part is that there is a possibility of losing semaphore but that could be solved with for example a timed job that just clears semaphores on a timed basis.

So here my solution:

class BlockingThreadPoolTaskExecutor(concurrency: Int) : ThreadPoolTaskExecutor() {
    companion object {
        lateinit var semaphore: Semaphore
    }

    init {
        semaphore = Semaphore(concurrency)
        val semaphoreTaskDecorator = SemaphoreTaskDecorator()
        this.setTaskDecorator(semaphoreTaskDecorator)
    }

    override fun <T> submit(task: Callable<T>): Future<T> {
        log.debug("submit")
        semaphore.acquire()
        return super.submit(task)
    }
}

private class SemaphoreTaskDecorator : TaskDecorator {
    override fun decorate(runnable: Runnable): Runnable {
        log.debug("decorate")
        return Runnable {
            try {
                runnable.run()
            } finally {
                log.debug("decorate done")
                semaphore.release()
            }
        }
    }
}
Heliacal answered 23/4, 2020 at 14:37 Comment(1)
This doesn't take into account the void execute(Runnable command) method which will go to the queue unblocked.Scaffold
S
0

One could overwrite ThreadPoolExecutor.execute(command) to use a Semaphore, e.g.:

/**
 * The setup answering the question needs to have:
 *
 * permits = 3
 * corePoolSize = permits (i.e. 3)
 * maximumPoolSize = corePoolSize (i.e. 3)
 * workQueue = anything different to null
 *
 * With this setup workQueue won’t actually be used but only 
 * to check if it’s empty, which should always be the case.
 * Any more than permits as value for maximumPoolSize will have 
 * no effect because at any moment no more than permits calls to 
 * super.execute() will be allowed by the semaphore.
 */
public class ExecutionBlockingThreadPool extends ThreadPoolExecutor {
  private final Semaphore semaphore;

  // constructor setting super(…) parameters and initializing semaphore
  //
  // Below is a bare minimum constructor; using
  // corePoolSize = maximumPoolSize = permits 
  // allows one to use SynchronousQueue because I expect
  // none other that isEmpty() to be called on it; it also
  // allows for using 0L SECONDS because no more than 
  // corePoolSize threads should be attempted to create.
  public ExecutionBlockingThreadPool(int corePoolSize) {
    super(corePoolSize, corePoolSize, 0L, SECONDS, new SynchronousQueue<Runnable>());
    semaphore = new Semaphore(corePoolSize, true);
  }

  public void execute(Runnable command) {
    semaphore.acquire();
    super.execute(() -> {
      try {
        command.run();
      } finally {
        semaphore.release();
      }
    }
  }
}
Scaffold answered 4/4, 2022 at 15:58 Comment(0)
B
0

You can imlement RejectedTaskHandler and get all the rejected tasks when Queue size if full. By default executors have the Abort policy so you can add these task back to the queue from handler or whatever the choice is.

public class ExecutorRejectedTaskHandlerFixedThreadPool {
public static void main(String[] args) throws InterruptedException {

    //maximum queue size : 2
    BlockingQueue<Runnable> blockingQueue =
            new LinkedBlockingQueue<Runnable>(2);


    CustomThreadPoolExecutor executor =
            new CustomThreadPoolExecutor(4, 5, 5, TimeUnit.SECONDS,
                    blockingQueue);

    RejectedTaskHandler rejectedHandler = new RejectedTaskHandler();
    executor.setRejectedExecutionHandler(rejectedHandler);
    //submit 20 the tasks for execution
    //Note: only 7 tasks(5-max pool size + 2-queue size) will be executed and rest will be rejected as queue will be overflowed
    for (int i = 0; i < 20; i++) {
        executor.execute(new Task());
    }
    System.out.println("Thread name " + Thread.currentThread().getName());


}

static class Task implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Thread - " + Thread.currentThread().getName() + " performing it's job");
    }
}


static class RejectedTaskHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("Task rejected" + r.toString());

    }
}


public static class CustomThreadPoolExecutor extends ThreadPoolExecutor {

    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                    long keepAliveTime, TimeUnit unit,
                                    BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

    }
}

}

Brian answered 11/9, 2022 at 11:8 Comment(0)
K
0

We already have solution in spring-integration called CallerBlocksPolicy

Code Example:

var executor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.SECONDS, blockingQueue, new CallerBlocksPolicy(99999));
Kuroshio answered 27/4 at 18:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.