Java: ExecutorService that blocks on submission after a certain queue size [duplicate]
Asked Answered
P

8

94

I am trying to code a solution in which a single thread produces I/O-intensive tasks that can be performed in parallel. Each task have significant in-memory data. So I want to be able limit the number of tasks that are pending at a moment.

If I create ThreadPoolExecutor like this:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

Then the executor.submit(callable) throws RejectedExecutionException when the queue fills up and all the threads are already busy.

What can I do to make executor.submit(callable) block when the queue is full and all threads are busy?

EDIT: I tried this:

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

And it somewhat achieves the effect that I want achieved but in an inelegant way (basically rejected threads are run in the calling thread, so this blocks the calling thread from submitting more).

EDIT: (5 years after asking the question)

To anyone reading this question and its answers, please don't take the accepted answer as one correct solution. Please read through all answers and comments.

Phooey answered 23/12, 2010 at 19:50 Comment(7)
I've used a Semaphore before to do exactly that, just like in the answer to the very similar question @axtavt linked to.Farrington
The question mentioned above also have an answer based on RejectedExecutionHandlerPhooey
I think your solution with CallerRunsPolicy is perfect. Why would you call it inelegant?Scaliger
@Scaliger For one thing, you get one more task executing in parallel than numWorkerThreads when caller thread is also executing a task. But, the more important issues is that if the caller thread gets a long running task, the other threads may sit idle waiting for the next task.Phooey
@TahirAkhtar, true; the Queue should be sufficiently long so it does not run dry when the caller has to execute the task herselfe. But I think it is an advantage if one more thread, the caller thread, can be used to execute tasks. If the caller just blocks, the thread of the caller would be idle. I use CallerRunsPolicy with a queue three times the capazity of the threadpool and it works nice and smoothly. Compared to this solution, I would consider tempering with the framework overengineering.Scaliger
@TomWalk +1 Good points. It seems like another difference is that if the task was rejected from the queue and was run by the caller thread, then the caller thread would begin processing a request out of order since it didn't wait its turn in the queue. Certainly, if you've already chosen to use threads then you must handle any dependencies properly, but just something to keep in mind.Sybil
Good point @rimsky. As you noted this shouldn't generally be a concern as in parallel processing some out of order processing is bound to happen sooner or later as usually the tasks will not take exactly the same time to completePhooey
S
66

I have done this same thing. The trick is to create a BlockingQueue where the offer() method is really a put(). (you can use whatever base BlockingQueue impl you want).

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

Note that this only works for thread pool where corePoolSize==maxPoolSize so be careful there (see comments).

Springhead answered 23/12, 2010 at 20:56 Comment(7)
alternatively you could extend the SynchronousQueue to prevent buffering, allowing only direct handoffs.Lectionary
Elegant and directly addresses the problem. offer() becomes put(), and put() means "... waiting if necessary for space to become available"Rockefeller
I don't think this is a good idea because it changes the protocol of the offer method. Offer method should be a non-blocking call.Wideawake
I disagree - this changes the behavior of ThreadPoolExecutor.execute such that if you have a corePoolSize < maxPoolSize, the ThreadPoolExecutor logic will never add additional workers beyond the core.Asti
@Chris - i'm not sure why you "disagree"? the solution i presented works. you are correct, however, that since the TPE impl depends on failed offers to add threads beyond the core pool size, this solution won't work well for that. however, that doesn't make this solution invalid, just incompatible with that scenario. (it's unfortunate that the TPE depends on failed offers to add threads beyond the core pool size as that causes issues in a variety of different scenarios).Springhead
To clarify - your solution works only as long as you maintain the constraint where corePoolSize==maxPoolSize. Without that, it no longer lets ThreadPoolExecutor have the designed behavior. I was looking for a solution to this problem that took that did not have that restriction; see my alternative answer below for the approach we ended up taking.Asti
I think this is an excellent solution. It is much simpler than some of the others presented, yet still solves the problem. Sure, it might not work with certain thread pool implementations; however, that does not discount the fact that it works in many situations without needing to resort to ugly hacks. A solution does not have to work for all cases to be good.Weighbridge
A
17

The currently accepted answer has a potentially significant problem - it changes the behavior of ThreadPoolExecutor.execute such that if you have a corePoolSize < maxPoolSize, the ThreadPoolExecutor logic will never add additional workers beyond the core.

From ThreadPoolExecutor.execute(Runnable):

    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);

Specifically, that last 'else' block willl never be hit.

A better alternative is to do something similar to what OP is already doing - use a RejectedExecutionHandler to do the same put logic:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
        if (!executor.isShutdown()) {
            executor.getQueue().put(r);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
    }
}

There are some things to watch out for with this approach, as pointed out in the comments (referring to this answer):

  1. If corePoolSize==0, then there is a race condition where all threads in the pool may die before the task is visible
  2. Using an implementation that wraps the queue tasks (not applicable to ThreadPoolExecutor) will result in issues unless the handler also wraps it the same way.

Keeping those gotchas in mind, this solution will work for most typical ThreadPoolExecutors, and will properly handle the case where corePoolSize < maxPoolSize.

Asti answered 20/8, 2015 at 16:27 Comment(7)
To whoever downvoted - can you provide some insight? Is there something incorrect / misleading / dangerous in this answer? I'd like the opportunity to address your concerns.Asti
I did not downvote, but it appears to be a very bad ideaTopmost
@Topmost - thanks for the link - that answer raises some valid cases that should be known if using this approach, but IMO doesn't make it a "very bad idea" - it still solves an issue present in the currently accepted answer. I've updated my answer with those caveats.Asti
If the core pool size is 0, and if the task is submitted to the executor, the executor will start creating thread/s if the queue is full so as to handle the task. Then why is it prone to deadlock. Didn't get your point. Could you elaborate.?Emotionalism
@ShirgillFarhanAnsari - it's the case raised in the previous comment. It can happen because adding directly to the queue doesn't trigger creating threads / starting workers. It's an edge case / race condition that can be mitigated by having a non-zero core pool sizeAsti
java 8 implementation of execute() takes care of this condition "If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none." else if (workerCountOf(recheck) == 0) addWorker(null, false);Gula
I like these, seems like it's cleaner approach than creating own impl of LinkedBlockingQueue. But ... I've tried it and it spins eight threads even than I have maxPoolSize set to five.Vagabondage
L
16

Here is how I solved this on my end:

(note: this solution does block the thread that submits the Callable, so it prevents RejectedExecutionException from being thrown )

public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}
Locality answered 26/6, 2014 at 0:45 Comment(5)
I assume this doesn't work well for cases where corePoolSize < maxPoolSize ... :|Abominate
It works for the case where corePoolSize < maxPoolSize. In those cases, the semaphore will be available, but there won't be a thread, and the SynchronousQueue will return false. The ThreadPoolExecutor will then spin a new thread. The problem of this solution is that it has a race condition. After semaphore.release(), but before the thread finishing execute, submit() will get the semaphore permit. IF the super.submit() is run before the execute() finishes, the job will be rejected.Planchette
@LuísGuilherme But semaphore.release() will never be called before the thread finishes execution. Because this call is done in the afterExecute(...) method. Am I missing something in the scenario you are describing?Locality
afterExecute is called by the same thread that runs the task, so it's not finished yet. Do the test yourself. Implement that solution, and throw huge amounts of work at the executor, throwing if the work is rejected. You'll notice that yes, this has a race condition, and it's not hard to reproduce it.Planchette
Go to ThreadPoolExecutor and check runWorker(Worker w) method. You'll see that things happen after afterExecute finishes, including the unlocking of the worker and increasing of the number of completed tasks. So, you allowed tasks to come in (by releasing the semaphore) without having bandwith to process them (by calling processWorkerExit).Planchette
E
5

I know this is an old question but had a similar issue that creating new tasks was very fast and if there were too many an OutOfMemoryError occur because existing task were not completed fast enough.

In my case Callables are submitted and I need the result hence I need to store all the Futures returned by executor.submit(). My solution was to put the Futures into a BlockingQueue with a maximum size. Once that queue is full, no more tasks are generated until some are completed (elements removed from queue). In pseudo-code:

final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads);
final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize);
try {   
    Thread taskGenerator = new Thread() {
        @Override
        public void run() {
            while (reader.hasNext) {
                Callable task = generateTask(reader.next());
                Future future = executor.submit(task);
                try {
                    // if queue is full blocks until a task
                    // is completed and hence no future tasks are submitted.
                    futures.put(future);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();         
                }
            }
        executor.shutdown();
        }
    }
    taskGenerator.start();
    
    // read from queue as long as task are being generated
    // or while Queue has elements in it
    while (taskGenerator.isAlive()
                    || !futures.isEmpty()) {
        Future future = futures.take();
        // do something
    }
} catch (InterruptedException ex) {
    Thread.currentThread().interrupt();     
} catch (ExecutionException ex) {
    throw new MyException(ex);
} finally {
    executor.shutdownNow();
}
Empennage answered 17/7, 2013 at 11:23 Comment(2)
what is the compoundFuture for?Intricacy
that was the original name of the variable which I did not consistently "rename" for this example.Empennage
S
5

How about using the CallerBlocksPolicy class if you are using spring-integration?

This class implements the RejectedExecutionHandler interface, which is a handler for tasks that cannot be executed by a ThreadPoolExecutor.

You can use this policy like this.

executor.setRejectedExecutionHandler(new CallerBlocksPolicy());

The main difference between CallerBlocksPolicy and CallerRunsPolicy is whether it blocks or runs the task in the caller thread.

Please refer to this code.

Seay answered 4/2, 2021 at 19:28 Comment(1)
Looks like a nice option. If it was in separate utility library it will be easier to userPhooey
C
3

I had the similar problem and I implemented that by using beforeExecute/afterExecute hooks from ThreadPoolExecutor:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Blocks current task execution if there is not enough resources for it.
 * Maximum task count usage controlled by maxTaskCount property.
 */
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {

    private final ReentrantLock taskLock = new ReentrantLock();
    private final Condition unpaused = taskLock.newCondition();
    private final int maxTaskCount;

    private volatile int currentTaskCount;

    public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, int maxTaskCount) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.maxTaskCount = maxTaskCount;
    }

    /**
     * Executes task if there is enough system resources for it. Otherwise
     * waits.
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        taskLock.lock();
        try {
            // Spin while we will not have enough capacity for this job
            while (maxTaskCount < currentTaskCount) {
                try {
                    unpaused.await();
                } catch (InterruptedException e) {
                    t.interrupt();
                }
            }
            currentTaskCount++;
        } finally {
            taskLock.unlock();
        }
    }

    /**
     * Signalling that one more task is welcome
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        taskLock.lock();
        try {
            currentTaskCount--;
            unpaused.signalAll();
        } finally {
            taskLock.unlock();
        }
    }
}

This should be good enough for you. Btw, original implementation was task size based because one task could be larger 100 time than another and submitting two huge tasks was killing the box, but running one big and plenty of small was Okay. If your I/O-intensive tasks are roughly the same size you could use this class, otherwise just let me know and I'll post size based implementation.

P.S. You would want to check ThreadPoolExecutor javadoc. It's really nice user guide from Doug Lea about how it could be easily customized.

Catalan answered 24/12, 2010 at 2:25 Comment(5)
I am wondering what will happen when a Thread is holding the lock in beforeExecute() and sees that maxTaskCount < currentTaskCount and starts waiting on unpaused condition. At the same time another thread tries to acquire the lock in afterExecute() to signal completion of a task. Will it not a deadlock?Phooey
I also noticed that this solution will not block the thread that submits the tasks when the queue gets full. So RejectedExecutionException is still possible.Phooey
Semantic of ReentrantLock/Condition classes is similar to what synchronised&wait/notify provides. When the condition waiting methods are called the lock is released, so there will be no deadlock.Catalan
Right, this ExecutorService blocks tasks on submission without blocking caller thread. Job just getting submitted and will be processed asynchronously when there will be enough system resources for it.Catalan
This, and any other solution using afterExecute are subject to the race condition described by @Luís Guilherme in a comment on another answer at https://mcmap.net/q/195636/-java-executorservice-that-blocks-on-submission-after-a-certain-queue-size-duplicateRoborant
T
3

I have implemented a solution following the decorator pattern and using a semaphore to control the number of executed tasks. You can use it with any Executor and:

  • Specify the maximum of ongoing tasks
  • Specify the maximum timeout to wait for a task execution permit (if the timeout passes and no permit is acquired, a RejectedExecutionException is thrown)
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

import javax.annotation.Nonnull;

public class BlockingOnFullQueueExecutorDecorator implements Executor {

    private static final class PermitReleasingDecorator implements Runnable {

        @Nonnull
        private final Runnable delegate;

        @Nonnull
        private final Semaphore semaphore;

        private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) {
            this.delegate = task;
            this.semaphore = semaphoreToRelease;
        }

        @Override
        public void run() {
            try {
                this.delegate.run();
            }
            finally {
                // however execution goes, release permit for next task
                this.semaphore.release();
            }
        }

        @Override
        public final String toString() {
            return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate);
        }
    }

    @Nonnull
    private final Semaphore taskLimit;

    @Nonnull
    private final Duration timeout;

    @Nonnull
    private final Executor delegate;

    public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) {
        this.delegate = Objects.requireNonNull(executor, "'executor' must not be null");
        if (maximumTaskNumber < 1) {
            throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber));
        }
        this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null");
        if (this.timeout.isNegative()) {
            throw new IllegalArgumentException("'maximumTimeout' must not be negative");
        }
        this.taskLimit = new Semaphore(maximumTaskNumber);
    }

    @Override
    public final void execute(final Runnable command) {
        Objects.requireNonNull(command, "'command' must not be null");
        try {
            // attempt to acquire permit for task execution
            if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) {
                throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate));
            }
        }
        catch (final InterruptedException e) {
            // restore interrupt status
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }

        this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit));
    }

    @Override
    public final String toString() {
        return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(),
                this.timeout, this.delegate);
    }
}
Tempt answered 19/3, 2018 at 21:53 Comment(0)
G
1

I think it is as simple as using a ArrayBlockingQueue instead of a a LinkedBlockingQueue.

Ignore me... that's totally wrong. ThreadPoolExecutor calls Queue#offer not put which would have the effect you require.

You could extend ThreadPoolExecutor and provide an implementation of execute(Runnable) that calls put in place of offer.

That doesn't seem like a completely satisfactory answer I'm afraid.

Gnat answered 23/12, 2010 at 19:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.