Is adding tasks to BlockingQueue of ThreadPoolExecutor advisable?
Asked Answered
C

5

21

The JavaDoc for ThreadPoolExecutor is unclear on whether it is acceptable to add tasks directly to the BlockingQueue backing the executor. The docs say calling executor.getQueue() is "intended primarily for debugging and monitoring".

I'm constructing a ThreadPoolExecutor with my own BlockingQueue. I retain a reference to the queue so I can add tasks to it directly. The same queue is returned by getQueue() so I assume the admonition in getQueue() applies to a reference to the backing queue acquired through my means.

Example

General pattern of the code is:

int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
    Runnable job = ...;
    queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
    try {
        Thread.sleep(...);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
executor.shutdownNow();

queue.offer() vs executor.execute()

As I understand it, the typical use is to add tasks via executor.execute(). The approach in my example above has the benefit of blocking on the queue whereas execute() fails immediately if the queue is full and rejects my task. I also like that submitting jobs interacts with a blocking queue; this feels more "pure" producer-consumer to me.

An implication of adding tasks to the queue directly: I must call prestartAllCoreThreads() otherwise no worker threads are running. Assuming no other interactions with the executor, nothing will be monitoring the queue (examination of ThreadPoolExecutor source confirms this). This also implies for direct enqueuing that the ThreadPoolExecutor must additionally be configured for > 0 core threads and mustn't be configured to allow core threads to timeout.

tl;dr

Given a ThreadPoolExecutor configured as follows:

  • core threads > 0
  • core threads aren't allowed to timeout
  • core threads are prestarted
  • hold a reference to the BlockingQueue backing the executor

Is it acceptable to add tasks directly to the queue instead of calling executor.execute()?

Related

This question ( producer/consumer work queues ) is similar, but doesn't specifically cover adding to the queue directly.

Coypu answered 7/4, 2011 at 18:41 Comment(2)
"I retain a reference to the queue so I can add tasks to it directly" but why would you do that? Why not just submit them to the executor?Precisian
@Raedwald, see above where I write "The approach in my example above has the benefit of blocking on the queue..."Coypu
F
12

If it were me, I would prefer using Executor#execute() over Queue#offer(), simply because I'm using everything else from java.util.concurrent already.

Your question is a good one, and it piqued my interest, so I took a look at the source for ThreadPoolExecutor#execute():

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

We can see that execute itself calls offer() on the work queue, but not before doing some nice, tasty pool manipulations if necessary. For that reason, I'd think that it'd be advisable to use execute(); not using it may (although I don't know for certain) cause the pool to operate in a non-optimal way. However, I don't think that using offer() will break the executor - it looks like tasks are pulled off the queue using the following (also from ThreadPoolExecutor):

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

This getTask() method is just called from within a loop, so if the executor's not shutting down, it'd block until a new task was given to the queue (regardless of from where it came from).

Note: Even though I've posted code snippets from source here, we can't rely on them for a definitive answer - we should only be coding to the API. We don't know how the implementation of execute() will change over time.

Fun answered 7/4, 2011 at 18:53 Comment(0)
J
11

One trick is to implement a custom subclass of ArrayBlockingQueue and to override the offer() method to call your blocking version, then you can still use the normal code path.

queue = new ArrayBlockingQueue<Runnable>(queueSize) {
  @Override public boolean offer(Runnable runnable) {
    try {
      return offer(runnable, 1, TimeUnit.HOURS);
    } catch(InterruptedException e) {
      // return interrupt status to caller
      Thread.currentThread().interrupt();
    }
    return false;
  }
};

(as you can probably guess, i think calling offer directly on the queue as your normal code path is probably a bad idea).

Joanjoana answered 7/4, 2011 at 19:32 Comment(0)
B
4

One can actually configure behavior of the pool when the queue is full, by specifying a RejectedExecutionHandler at instantiation. ThreadPoolExecutor defines four policies as inner classes, including AbortPolicy, DiscardOldestPolicy, DiscardPolicy, as well as my personal favorite, CallerRunsPolicy, which runs the new job in the controlling thread.

For example:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        nproc, // core size
        nproc, // max size
        60, // idle timeout
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(4096, true), // Fairness = true guarantees FIFO
        new ThreadPoolExecutor.CallerRunsPolicy() ); // If we have to reject a task, run it in the calling thread.

The behavior desired in the question can be obtained using something like:

public class BlockingPolicy implements RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        executor.getQueue.put(r); // Self contained, no queue reference needed.
    }

At some point the queue must be accessed. The best place to do so is in a self-contained RejectedExecutionHandler, which saves any code duplication or potenial bugs arising from direct manipulation of the queue at the scope of the pool object. Note that the handlers included in ThreadPoolExecutor themselves use getQueue().

Bronchia answered 30/8, 2013 at 7:0 Comment(1)
I like that you present another alternative. My perspective at the time I was working on this was that the ThreadPoolExecutor was an implementation detail. I can model producer/consumer with blocking queues and swap out the implementation of how the work is scheduled without changing client code.Coypu
G
2

It's a very important question if the queue you're using is a completely different implementation from the standard in-memory LinkedBlockingQueue or ArrayBlockingQueue.

For instance if you're implementing the producer-consumer pattern using several producers on different machines, and use a queuing mechanism based on a separate persistence subsystem (like Redis), then the question becomes relevant on its own, even if you don't want a blocking offer() like the OP.

So the given answer, that prestartAllCoreThreads() has to be called (or enough times prestartCoreThread()) for the worker threads to be available and running, is important enough to be stressed.

Gignac answered 25/10, 2013 at 11:48 Comment(0)
S
0

If required, we can also use a parking lot which separates main processing from rejected tasks -

    final CountDownLatch taskCounter = new CountDownLatch(TASKCOUNT);
    final List<Runnable> taskParking = new LinkedList<Runnable>();
    BlockingQueue<Runnable> taskPool = new ArrayBlockingQueue<Runnable>(1);
    RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.err.println(Thread.currentThread().getName() + " -->rejection reported - adding to parking lot " + r);
            taskCounter.countDown();
            taskParking.add(r);
        }
    };
    ThreadPoolExecutor threadPoolExecutor =  new ThreadPoolExecutor(5, 10, 1000, TimeUnit.SECONDS, taskPool, rejectionHandler);
    for(int i=0 ; i<TASKCOUNT; i++){
        //main 
        threadPoolExecutor.submit(getRandomTask());
    }
    taskCounter.await(TASKCOUNT * 5 , TimeUnit.SECONDS);
    System.out.println("Checking the parking lot..." + taskParking);
    while(taskParking.size() > 0){
        Runnable r = taskParking.remove(0);
        System.out.println("Running from parking lot..." + r);
        if(taskParking.size() > LIMIT){
          waitForSometime(...);
        }
        threadPoolExecutor.submit(r);
    }
    threadPoolExecutor.shutdown();
Sanasanabria answered 17/6, 2015 at 12:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.