Producer Consumer - Using Executors.newFixedThreadPool
Asked Answered
C

3

7

My understanding of a Producer-Consumer pattern is that it could be implemented using a queue shared between the producer and the consumer. Producer submits work to a shared queue, consumer retrieves it and processes it. It could also be implemented by the producer directly submitting to the consumer (Producer threads submitting to Consumer's executor service directly). 

Now, I've been looking at the Executors class that provides some common implementations of thread pools. The method newFixedThreadPool, according to the spec, "reuses a fixed number of threads operating off a shared unbounded queue". Which queue are they talking about here? 

If the Producer directly submits a task to a consumer, is it the internal queue of the ExecutorService that contains the list of Runnables?

Or is it the intermediate queue, in case the producer submits to a shared queue? 

May be I'm missing the whole point, but would someone please clarify?

Cobaltous answered 11/8, 2011 at 19:55 Comment(0)
F
4

You are right, ExecutorService is not only a thread pool, but it is a full Producer-Consumer implementation. This internal queue is in fact a thread-safe queue of Runnables (FutureTask to be precise) holding tasks you submit().

All the threads in the pool are blocked on that queue, waiting for tasks to be executed. When you submit() a task, exactly one thread will pick it up and run it. Of course submit() is not waiting for thread in the pool to finish processing.

On the other hand if you submit a huge number of tasks (or long-running ones) you might end-up with all threads in the pool being occupied and some tasks waiting in the queue. Once any thread is done with its task, it will immediately pick the first one from the queue.

Faden answered 11/8, 2011 at 20:16 Comment(5)
Just to clarify: ExecutorService is just an interface. You could implement ExecutorService with a class that just runs each runnable as soon as it is submitted, in the same thread (and I believe there is an implementation in the java.util.concurrent package that does just that). But in practice, most ExecutorService implementations are a full producer-consumer implementation.Mauramauralia
You are absolutely right, by an ExecutorService I means "the thing returned by Executors.newFixedThreadPool() that implements ExecutorService". Thanks for clarification.Faden
Thanks guys. So, if I create an executor service using newFixedThreadPool(8) and then execute about 1000 runnable tasks on it, please confirm my understanding of the scenario: 1. at most 8 threads would be created 2. at the beginning of processing, while the 8 threads are busy, 992 of the tasks would be held in the internal queue 3. Also, because its an unbounded queue, there is no upper limit on the number of tasks that I could submit to the executor service. What effect would the above scenario have if I create an ExecutorService with a bounded queue? Would it be perform better? Thanks, O.Cobaltous
@Oxford: You are correct. Regarding a bounded queue: if the queue has a limit (let's say 100 entries), then once you hit 100 entries, any attempt to submit() a new Runnable will block until at least one task has finished and there is room in the queue again. This can be useful for rate-limiting the producer thread. In some cases, you don't want the producer thread to ever block, in which case you probably want an unbounded queue.Mauramauralia
@Daniel, submit() does not throws InterruptedException so it will not block the consumer. However, if the queue is full, the task is delegated to RejectedExecutionHandler.Ionia
G
1
public class Producer extends Thread {  
    static List<String> list = new ArrayList<String>();  

    public static void main(String[] args) {  
        ScheduledExecutorService executor = Executors  
                .newScheduledThreadPool(12);  
        int initialDelay = 5;  
        int pollingFrequency = 5;  
        Producer producer = new Producer();  
        @SuppressWarnings({ "rawtypes", "unused" })  
        ScheduledFuture schedFutureProducer = executor.scheduleWithFixedDelay(  
                producer, initialDelay, pollingFrequency, TimeUnit.SECONDS);  
        for (int i = 0; i < 3; i++) {  
            Consumer consumer = new Consumer();  
            @SuppressWarnings({ "rawtypes", "unused" })  
            ScheduledFuture schedFutureConsumer = executor  
                    .scheduleWithFixedDelay(consumer, initialDelay,  
                            pollingFrequency, TimeUnit.SECONDS);  
        }  

    }  

    @Override  
    public void run() {  
        list.add("object added to list is " + System.currentTimeMillis());  
                              ///adding in list become slow also because of synchronized behavior  
    }  
}  

class Consumer extends Thread {  

    @Override  
    public void run() {  
        getObjectFromList();  
    }  

    private void getObjectFromList() {  
        synchronized (Producer.list) {  
            if (Producer.list.size() > 0) {  
                System.out.println("Object name removed by "  
                        + Thread.currentThread().getName() + "is "  
                        + Producer.list.get(0));  
                Producer.list.remove(Producer.list.get(0));  
            }  
        }  
    }  
}  
Girasol answered 12/6, 2012 at 9:59 Comment(0)
S
0

Check this out:
Producer-Consumer example in Java (RabbitMQ) (It's written for another library but it's in Java and it demonstrates clearly the concept ;)
Hope it helps!

P.S.:Actually, it has several examples but you get the idea ;)

Spurge answered 11/8, 2011 at 22:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.