How to make ThreadPoolExecutor's submit() method block if it is saturated?
Asked Answered
V

17

114

I want to create a ThreadPoolExecutor such that when it has reached its maximum size and the queue is full, the submit() method blocks when trying to add new tasks. Do I need to implement a custom RejectedExecutionHandler for that or is there an existing way to do this using a standard Java library?

Varityper answered 4/1, 2010 at 17:59 Comment(3)
Is what you want anything like the Array blocking queue's offer() method?Cloninger
possible duplicate of Java: ExecutorService that blocks on submission after a certain queue sizeAdige
@Adige I disagree. This Q&A looks more valuable (in addition to being older).Artamas
V
48

One of the possible solutions I've just found:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}

Are there any other solutions? I'd prefer something based on RejectedExecutionHandler since it seems like a standard way to handle such situations.

Varityper answered 4/1, 2010 at 18:20 Comment(14)
When you catch RejectedExecutionException, should this not be re-thrown after the semaphore.release(); such that the caller still finds out about the problem?Pulverulent
@Varityper I am using ThreadPoolExecutor with settings: core thread pool size - 2, max thread pool size - 10. Using BoundedExecutor, ThreadPoolExecutor exec is not spawning a new thread, when initial 2 threads are busy. When I don't use BoundedExecutor, it spawns new thread when 2 core pool threads are busy.Delimitate
Is there a race condition here between the point when the semaphore is released in the finally clause and the semaphore is acquired?Rochet
@ArtemShnayder I think a task can release the semaphore and thus unblock an acquire() and allow a new task to be submitted just before the thread is put on the pool, causing a rejection. You meant that?Hydroxyl
As mentioned above, this implementation is flawed because the semaphore is released before the task completes. It would be better to use method java.util.concurrent.ThreadPoolExecutor#afterExecute(Runnable, Throwable)Runway
@FelixM: using java.util.concurrent.ThreadPoolExecutor#afterExecute(Runnable, Throwable) will not solve the problem because afterExecute is called immediately after task.run() in java.util.concurrent.ThreadPoolExecutor#runWorker(Worker w), before taking the next element from the queue (looking at the source code of openjdk 1.7.0.6).Counterfactual
But it seems to me that a rejection can only occur if one has bound > queueCapacity (in particular, if one naively takes bound = queueCapacity + coreThreadCount). So one should simply use bound = queueCapacity. Then actually one would like to release the semaphore as soon as possible after the element has been taken from the queue; so beforeExecute would be a better place for releasing it, wouldn't it?Counterfactual
This answer is from the book Java Concurrency in Practice by Brian GoetzYellowish
^ You mean that an answer from Java Concurrency in Practice is flawed? I agree with @Jaan, I guess that should work.Rawboned
The answer is inherently flawed in that there is a chance you will still get a rejection exception abeit probably very rare. This is because as others have mentioned afterExecute is not the last thing that gets executed before worker cleanup which you can clearly see in the code. The semaphore is thus just a rough but mostly accurate representation of the queue + pool. Also if you set the bound below the queue + pool size and the executor is not used in other places in theory this will less likely happen.Swerve
This answer is not entirely correct, also are the comments. This piece of code indeed comes from Java Concurrency in Practice, and it is correct if you take its context into account. The book clearly states, literally: "In such an approach, use an unbounded queue (...) and set the bound on the semaphore to be equal to the pool size plus the number of queued tasks you want to allow". With an unbounded queue, tasks will never be rejected, so rethrowing the exception is entirely useless! Which is, I believe, also the reason why throw e; is NOT in the book. JCIP is correct!Suberin
Specifically, this answer is based on Listing 8.4 of JCIP. It is on page 176 of my copy.Adrastus
@Suberin When you set the bound on the semaphore to be equal to the pool size plus the number of queued tasks and do the tests, this will still throw the RejectedExecutionException.Spiderwort
@Runway you wrote "this implementation is flawed because the semaphore is released before the task completes." Why does that matter? It's not specified what the bound is, and if it's the queue limit, then releasing the semaphore when it runs is correct, as it's no longer in the queue. What am I missing?Winglet
F
31

You can use ThreadPoolExecutor and a blockingQueue:

public class ImageManager {
    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
    RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
    private ExecutorService executorService =  new ThreadPoolExecutor(numOfThread, numOfThread, 
        0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);

    private int downloadThumbnail(String fileListPath){
        executorService.submit(new yourRunnable());
    }
}
Foster answered 13/4, 2011 at 22:15 Comment(7)
I'd just like to say that this was an insanely quick and easy solution to implement that worked very well!Tiphani
This runs rejected tasks on the submitting thread. Which functionally does not meet the OP's requirements.Polyclinic
How do you implement this with a callable??Pitchfork
This does run the task "in the calling thread" instead of blocking to put it on the queue, which can have some adverse effects, like if multiple threads call it this way, more than "queue size" jobs will be running, and if the task happens to take longer than expected, your "producing" thread might not keep the executor busy. But worked great here!Bookseller
Downvoted: this does not block when the TPE is saturated. This is just an alternative, not a solution.Suberin
Upvoted: this pretty much fits the 'design of TPE' and 'naturally blocks' client threads by for giving them overflow tasts to do. This should cover most use cases, but not all of them of course and you should understand what is going on under the hood.Crashland
Would be a good idea when having many worker threads + queue capacity and max-time-guaranteed tasks.Carlenecarleton
V
13

You should use the CallerRunsPolicy, which executes the rejected task in the calling thread. This way, it can't submit any new tasks to the executor until that task is done, at which point there will be some free pool threads or the process will repeat.

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

From the docs:

Rejected tasks

New tasks submitted in method execute(java.lang.Runnable) will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. In either case, the execute method invokes the RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) method of its RejectedExecutionHandler. Four predefined handler policies are provided:

  1. In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection.
  2. In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.
  3. In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.
  4. In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)

Also, make sure to use a bounded queue, such as ArrayBlockingQueue, when calling the ThreadPoolExecutor constructor. Otherwise, nothing will get rejected.

Edit: in response to your comment, set the size of the ArrayBlockingQueue to be equal to the max size of the thread pool and use the AbortPolicy.

Edit 2: Ok, I see what you're getting at. What about this: override the beforeExecute() method to check that getActiveCount() doesn't exceed getMaximumPoolSize(), and if it does, sleep and try again?

Valverde answered 4/1, 2010 at 18:7 Comment(5)
I want to have number of concurrently executed tasks to be strictly bounded (by the number of threads in Executor), this is why I can't allow caller threads to execute these tasks themselves.Varityper
AbortPolicy would cause caller thread to receive a RejectedExecutionException, while I need it to just block.Varityper
I'm asking for blocking, not sleep&polling ;)Varityper
@danben: Don't you mean CallerRunsPolicy?Bibby
The problem with the CallerRunPolicy is that if you have a single thread producer you will often have threads not being used if a long running task happens to get rejected (because the other tasks in the thread pool will be finished while the long running task is still running).Swerve
M
8

I know, it is a hack, but in my opinion most clean hack between those offered here ;-)

Because ThreadPoolExecutor uses blocking queue "offer" instead of "put", lets override behaviour of "offer" of the blocking queue:

class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {

    BlockingQueueHack(int size) {
        super(size);
    }

    public boolean offer(T task) {
        try {
            this.put(task);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return true;
    }
}

ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));

I tested it and it seems to work. Implementing some timeout policy is left as a reader's exercise.

Mikkimiko answered 10/3, 2015 at 15:38 Comment(1)
See https://mcmap.net/q/195636/-java-executorservice-that-blocks-on-submission-after-a-certain-queue-size-duplicate for a cleaned up version of this. I agree, it's the cleanest way to do it.Nickelodeon
S
7

The following class wraps around a ThreadPoolExecutor and uses a Semaphore to block then the work queue is full:

public final class BlockingExecutor { 

    private final Executor executor;
    private final Semaphore semaphore;

    public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
        this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
        this.semaphore = new Semaphore(queueSize + maxPoolSize);
    }

    private void execImpl (final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            // will never be thrown with an unbounded buffer (LinkedBlockingQueue)
            semaphore.release();
            throw e;
        }
    }

    public void execute (Runnable command) throws InterruptedException {
        execImpl(command);
    }
}

This wrapper class is based on a solution given in the book Java Concurrency in Practice by Brian Goetz. The solution in the book only takes two constructor parameters: an Executor and a bound used for the semaphore. This is shown in the answer given by Fixpoint. There is a problem with that approach: it can get in a state where the pool threads are busy, the queue is full, but the semaphore has just released a permit. (semaphore.release() in the finally block). In this state, a new task can grab the just released permit, but is rejected because the task queue is full. Of course this is not something you want; you want to block in this case.

To solve this, we must use an unbounded queue, as JCiP clearly mentions. The semaphore acts as a guard, giving the effect of a virtual queue size. This has the side effect that it is possible that the unit can contain maxPoolSize + virtualQueueSize + maxPoolSize tasks. Why is that? Because of the semaphore.release() in the finally block. If all pool threads call this statement at the same time, then maxPoolSize permits are released, allowing the same number of tasks to enter the unit. If we were using a bounded queue, it would still be full, resulting in a rejected task. Now, because we know that this only occurs when a pool thread is almost done, this is not a problem. We know that the pool thread will not block, so a task will soon be taken from the queue.

You are able to use a bounded queue though. Just make sure that its size equals virtualQueueSize + maxPoolSize. Greater sizes are useless, the semaphore will prevent to let more items in. Smaller sizes will result in rejected tasks. The chance of tasks getting rejected increases as the size decreases. For example, say you want a bounded executor with maxPoolSize=2 and virtualQueueSize=5. Then take a semaphore with 5+2=7 permits and an actual queue size of 5+2=7. The real number of tasks that can be in the unit is then 2+5+2=9. When the executor is full (5 tasks in queue, 2 in thread pool, so 0 permits available) and ALL pool threads release their permits, then exactly 2 permits can be taken by tasks coming in.

Now the solution from JCiP is somewhat cumbersome to use as it doesn't enforce all these constraints (unbounded queue, or bounded with those math restrictions, etc.). I think that this only serves as a good example to demonstrate how you can build new thread safe classes based on the parts that are already available, but not as a full-grown, reusable class. I don't think that the latter was the author's intention.

Suberin answered 27/8, 2015 at 9:37 Comment(0)
L
6

Hibernate has a BlockPolicy that is simple and may do what you want:

See: Executors.java

/**
 * A handler for rejected tasks that will have the caller block until
 * space is available.
 */
public static class BlockPolicy implements RejectedExecutionHandler {

    /**
     * Creates a <tt>BlockPolicy</tt>.
     */
    public BlockPolicy() { }

    /**
     * Puts the Runnable to the blocking queue, effectively blocking
     * the delegating thread until space is available.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        try {
            e.getQueue().put( r );
        }
        catch (InterruptedException e1) {
            log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
        }
    }
}
Lakh answered 4/1, 2011 at 18:59 Comment(4)
On second thought, this is a pretty bad idea. I don't recommend you use it. For good reasons see here: #3446511Lakh
Also, this is not using the "standard Java library", as per the OP's request. Delete?Bibby
Woah, that is so ugly. Basically this solution interferes with the internals of TPE. The javadoc for ThreadPoolExecutor even says literally: "Method getQueue() allows access to the work queue for purposes of monitoring and debugging. Use of this method for any other purpose is strongly discouraged.". That this is available in a library that is so widely known, is absolutely sad to see.Suberin
com.amazonaws.services.simpleworkflow.flow.worker.BlockCallerPolicy is similar.Emsmus
P
5

The BoundedExecutor answer quoted above from Java Concurrency in Practice only works correctly if you use an unbounded queue for the Executor, or the semaphore bound is no greater than the queue size. The semaphore is state shared between the submitting thread and the threads in the pool, making it possible to saturate the executor even if queue size < bound <= (queue size + pool size).

Using CallerRunsPolicy is only valid if your tasks don't run forever, in which case your submitting thread will remain in rejectedExecution forever, and a bad idea if your tasks take a long time to run, because the submitting thread can't submit any new tasks or do anything else if it's running a task itself.

If that's not acceptable then I suggest checking the size of the executor's bounded queue before submitting a task. If the queue is full, then wait a short time before trying to submit again. The throughput will suffer, but I suggest it's a simpler solution than many of the other proposed solutions and you're guaranteed no tasks will get rejected.

Presbyterate answered 24/11, 2011 at 9:3 Comment(1)
I'm not sure how checking the queue length before submitting guarantees no rejected tasks in a multi-threaded environment with multiple task producers. That doesn't sound thread-safe.Rodgers
C
1

you can use a custom RejectedExecutionHandler like this

ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size
                max_handlers, // max size 
                timeout_in_seconds, // idle timeout 
                TimeUnit.SECONDS, queue, new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        // This will block if the queue is full
                        try {
                            executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                            System.err.println(e.getMessage());
                        }

                    }
                });
Clupeid answered 14/1, 2016 at 11:25 Comment(1)
The docs for the getQueue() explicitly mention that Access to the task queue is intended primarily for debugging and monitoring.Parthen
N
0

Create your own blocking queue to be used by the Executor, with the blocking behavior you are looking for, while always returning available remaining capacity (ensuring the executor will not try to create more threads than its core pool, or trigger the rejection handler).

I believe this will get you the blocking behavior you are looking for. A rejection handler will never fit the bill, since that indicates the executor can not perform the task. What I could envision there is that you get some form of 'busy waiting' in the handler. That is not what you want, you want a queue for the executor that blocks the caller...

Necrolatry answered 4/1, 2010 at 18:55 Comment(4)
ThreadPoolExecutor uses offer method to add tasks to queue. If I created a custom BlockingQueue that blocks on offer, that would break BlockingQueues contract.Varityper
@Shooshpanchick, that would break BlockingQueues contract. so what? if you are so keen you can explicitly enable the behavior during submit() only (it will take a ThreadLocal)Dorsad
See also this answer to another question which spells out this alternative.Breadroot
is there a reason why ThreadPoolExecutor was implemented to use offer and not put (the blocking version)? Also, if there was a way for the client code to tell which one to use when, a lot of people trying to hand-roll custom solutions would've been relievedHix
E
0

To avoid issues with @FixPoint solution. One could use ListeningExecutorService and release the semaphore onSuccess and onFailure inside FutureCallback.

Ethelynethene answered 15/4, 2014 at 23:0 Comment(1)
That has the same inherent problems as just wrapping the Runnable as those methods are still called before worker cleanup in the normal ThreadPoolExecutor. That is you will still need to handle rejection exceptions.Swerve
S
0

Recently I found this question having the same problem. The OP does not say so explicitly, but we do not want to use the RejectedExecutionHandler which executes a task on the submitter's thread, because this will under-utilize the worker threads if this task is a long running one.

Reading all the answers and comments, in particular the flawed solution with the semaphore or using afterExecute I had a closer look at the code of the ThreadPoolExecutor to see if there is some way out. I was amazed to see that there are more than 2000 lines of (commented) code, some of which make me feel dizzy. Given the rather simple requirement I actually have --- one producer, several consumers, let the producer block when no consumers can take work --- I decided to roll my own solution. It is not an ExecutorService but just an Executor. And it does not adapt the number of threads to the work load, but holds a fixed number of threads only, which also fits my requirements. Here is the code. Feel free to rant about it :-)

package x;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;

/**
 * distributes {@code Runnable}s to a fixed number of threads. To keep the
 * code lean, this is not an {@code ExecutorService}. In particular there is
 * only very simple support to shut this executor down.
 */
public class ParallelExecutor implements Executor {
  // other bounded queues work as well and are useful to buffer peak loads
  private final BlockingQueue<Runnable> workQueue =
      new SynchronousQueue<Runnable>();
  private final Thread[] threads;

  /*+**********************************************************************/
  /**
   * creates the requested number of threads and starts them to wait for
   * incoming work
   */
  public ParallelExecutor(int numThreads) {
    this.threads = new Thread[numThreads];
    for(int i=0; i<numThreads; i++) {
      // could reuse the same Runner all over, but keep it simple
      Thread t = new Thread(new Runner());
      this.threads[i] = t;
      t.start();
    }
  }
  /*+**********************************************************************/
  /**
   * returns immediately without waiting for the task to be finished, but may
   * block if all worker threads are busy.
   * 
   * @throws RejectedExecutionException if we got interrupted while waiting
   *         for a free worker
   */
  @Override
  public void execute(Runnable task)  {
    try {
      workQueue.put(task);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new RejectedExecutionException("interrupt while waiting for a free "
          + "worker.", e);
    }
  }
  /*+**********************************************************************/
  /**
   * Interrupts all workers and joins them. Tasks susceptible to an interrupt
   * will preempt their work. Blocks until the last thread surrendered.
   */
  public void interruptAndJoinAll() throws InterruptedException {
    for(Thread t : threads) {
      t.interrupt();
    }
    for(Thread t : threads) {
      t.join();
    }
  }
  /*+**********************************************************************/
  private final class Runner implements Runnable {
    @Override
    public void run() {
      while (!Thread.currentThread().isInterrupted()) {
        Runnable task;
        try {
          task = workQueue.take();
        } catch (InterruptedException e) {
          // canonical handling despite exiting right away
          Thread.currentThread().interrupt(); 
          return;
        }
        try {
          task.run();
        } catch (RuntimeException e) {
          // production code to use a logging framework
          e.printStackTrace();
        }
      }
    }
  }
}
Suribachi answered 26/7, 2014 at 18:7 Comment(0)
S
0

I believe there is quite elegant way to solve this problem by using java.util.concurrent.Semaphore and delegating behavior of Executor.newFixedThreadPool. The new executor service will only execute new task when there is a thread to do so. Blocking is managed by Semaphore with number of permits equal to number of threads. When a task is finished it returns a permit.

public class FixedThreadBlockingExecutorService extends AbstractExecutorService {

private final ExecutorService executor;
private final Semaphore blockExecution;

public FixedThreadBlockingExecutorService(int nTreads) {
    this.executor = Executors.newFixedThreadPool(nTreads);
    blockExecution = new Semaphore(nTreads);
}

@Override
public void shutdown() {
    executor.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
    return executor.shutdownNow();
}

@Override
public boolean isShutdown() {
    return executor.isShutdown();
}

@Override
public boolean isTerminated() {
    return executor.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    return executor.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
    blockExecution.acquireUninterruptibly();
    executor.execute(() -> {
        try {
            command.run();
        } finally {
            blockExecution.release();
        }
    });
}
Specious answered 24/1, 2015 at 22:21 Comment(1)
I implemented the BoundedExecutor described in Java Concurrency in Practice and figured out that the Semaphore must be initialized with the fairness flag set to true in order to ensure that Semaphore permits are offered in the order requests are made. Refer docs.oracle.com/javase/7/docs/api/java/util/concurrent/…. for detailsChenay
M
0

I had the same need in the past: a kind of blocking queue with a fixed size for each client backed by a shared thread pool. I ended up writing my own kind of ThreadPoolExecutor:

UserThreadPoolExecutor (blocking queue (per client) + threadpool (shared amongst all clients))

See: https://github.com/d4rxh4wx/UserThreadPoolExecutor

Each UserThreadPoolExecutor is given a maximum number of threads from a shared ThreadPoolExecutor

Each UserThreadPoolExecutor can:

  • submit a task to the shared thread pool executor if its quota is not reached. If its quota is reached, the job is queued (non-consumptive blocking waiting for CPU). Once one of its submitted task is completed, the quota is decremented, allowing another task waiting to be submitted to the ThreadPoolExecutor
  • wait for the remaining tasks to complete
Monostrophe answered 31/7, 2015 at 14:19 Comment(0)
W
0

I recently had a need to achieve something similar, but on a ScheduledExecutorService.

I had to also ensure that I handle the delay being passed on the method and ensure that either the task is submitted to execute at the time as the caller expects or just fails thus throwing a RejectedExecutionException.

Other methods from ScheduledThreadPoolExecutor to execute or submit a task internally call #schedule which will still in turn invoke the methods overridden.

import java.util.concurrent.*;

public class BlockingScheduler extends ScheduledThreadPoolExecutor {
    private final Semaphore maxQueueSize;

    public BlockingScheduler(int corePoolSize,
                             ThreadFactory threadFactory,
                             int maxQueueSize) {
        super(corePoolSize, threadFactory, new AbortPolicy());
        this.maxQueueSize = new Semaphore(maxQueueSize);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(delay));
        return super.schedule(command, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(callable, unit.toMillis(delay));
        return super.schedule(callable, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleAtFixedRate(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long period,
                                                     TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleWithFixedDelay(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable t) {
        super.afterExecute(runnable, t);
        try {
            if (t == null && runnable instanceof Future<?>) {
                try {
                    ((Future<?>) runnable).get();
                } catch (CancellationException | ExecutionException e) {
                    t = e;
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
            if (t != null) {
                System.err.println(t);
            }
        } finally {
            releaseQueueUsage();
        }
    }

    private long beforeSchedule(Runnable runnable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(runnable, this);
            return 0;
        }
    }

    private long beforeSchedule(Callable callable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(new FutureTask(callable), this);
            return 0;
        }
    }

    private long getQueuePermitAndModifiedDelay(long delay) throws InterruptedException {
        final long beforeAcquireTimeStamp = System.currentTimeMillis();
        maxQueueSize.tryAcquire(delay, TimeUnit.MILLISECONDS);
        final long afterAcquireTimeStamp = System.currentTimeMillis();
        return afterAcquireTimeStamp - beforeAcquireTimeStamp;
    }

    private void releaseQueueUsage() {
        maxQueueSize.release();
    }
}

I have the code here, will appreciate any feedback. https://github.com/AmitabhAwasthi/BlockingScheduler

Weis answered 2/10, 2016 at 16:55 Comment(2)
This answer relies completely on the content of external links. Should they ever become invalid, your answer would be useless. So please edit your answer and add at least a summary of what can be found there. Thank you!Elisavetpol
@fabio : thanks for pointing out. I've added the code in there so that now it makes more sense for readers. Appreciate your comment :)Weis
B
0

I don't always like the CallerRunsPolicy, especially since it allows the rejected task to 'skip the queue' and get executed before tasks that were submitted earlier. Moreover, executing the task on the calling thread might take much longer than waiting for the first slot to become available.

I solved this problem using a custom RejectedExecutionHandler, which simply blocks the calling thread for a little while and then tries to submit the task again:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

This class can just be used in the thread-pool executor as a RejectedExecutinHandler like any other, for example:

executorPool = new ThreadPoolExecutor(1, 1, 10,
                                      TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
                                      new BlockWhenQueueFull());

The only downside I see is that the calling thread might get locked slightly longer than strictly necessary (up to 250ms). Moreover, since this executor is effectively being called recursively, very long waits for a thread to become available (hours) might result in a stack overflow.

Nevertheless, I personally like this method. It's compact, easy to understand, and works well.

Bruit answered 6/11, 2017 at 22:2 Comment(2)
As you say yourself: this can create a stackoverflow. Not something I would want to have in production code.Suribachi
Everybody should make their own decisions. For my workload, this is not an issue. Tasks run in seconds instead of the hours that would be required to blow the stack. Moreover, the same can be said for virtually any recursive algorithm. Is that a reason to never use any recursive algorithm in production?Bruit
U
0
  1. CallerRunsPolicy is not the best choice here. It will block caller thread until completion of the rejected task.

From docs:

A handler for rejected tasks that runs the rejected task directly in the calling thread of the {@code execute} method, unless the executor has been shut down, in which case the task is discarded.

Code Example:

var executor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.SECONDS, blockingQueue, new CallerRunsPolicy());
  1. Use CallerBlocksPolicy from spring-integration. It will block caller thread until queue is ready to receive new task.

From docs:

A {@link RejectedExecutionHandler} that blocks the caller until the executor has room in its queue, or a timeout occurs (in which case a {@link RejectedExecutionException} is thrown.

Code Example:

var executor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.SECONDS, blockingQueue, new CallerBlocksPolicy(99999));
Unamerican answered 27/4 at 18:45 Comment(0)
I
-1

I found this rejection policy in elastic search client. It blocks caller thread on blocking queue. Code below-

 static class ForceQueuePolicy implements XRejectedExecutionHandler 
 {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
        {
            try 
            {
                executor.getQueue().put(r);
            } 
            catch (InterruptedException e) 
            {
                //should never happen since we never wait
                throw new EsRejectedExecutionException(e);
            }
        }

        @Override
        public long rejected() 
        {
            return 0;
        }
}
Intrusion answered 2/9, 2016 at 10:39 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.