BlockingQueue and putAll
Asked Answered
M

2

6

Does anybody know why java's BlockingQueue does not have a putAll method? Is there a problem with such a method? Any good ways around this problem without having to completely re-implement BlockingQueue?

Mesquite answered 2/7, 2010 at 1:28 Comment(0)
M
1
for (Item item : items) {
    queue.put(item);
}

3 lines, not sure thats totally re-implementing blocking queue.

I imagine it wants you to put 1 by 1 in case threads are waiting to read from it, they aren't waiting for you to finish reading them all.

Mailand answered 2/7, 2010 at 1:32 Comment(2)
I have a BlockingPriorityQueue and want the set of items to be put in one atomic action.Mesquite
Then you can do a synchronized (queue) around the for loop. But this isn't really how the queue is meant to be used (and if you are close to capacity, you may deadlock your program). If 1 atomic action is putting 100 items in the queue.. then your queue should be a queue<List<Item>> and you should put and retrieve the items in 100 unit chunks.Mailand
P
1

I found the same issue with ArrayBlockingQueue. We wanted some additional methods that were missing:

  • void putAll(Collection<? extends E> c) throws InterruptedException
  • int drainAtLeastOneTo(@OutputParam Collection<? super E> c) throws InterruptedException
  • int drainAtLeastOneTo(@OutputParam Collection<? super E> c, int maxElements) throws InterruptedException

Some people advocate to use BlockingQueue<List<E>>, but that requires malloc'ing lists. Sometimes you want to avoid it. Also, that assumes you want producer and consumer to use the same "chunk" sizes.

Regarding draining: Again, you may want to have a mismatch between chunk size of producer and consumer. So producer might insert single items, but consumer works on batches. drainTo() does not block, so drainAtLeastOneTo() was a solution.

In the end, we copied the default impl of ArrayBlockingQueue and added the methods directly. Again, a drawback is that you need to operate on a concrete type, instead of interface BlockingQueue.

You may also consider using the famed (infamous?) LMAX Disruptor, but the model is quite different from a standard BlockingQueue, as you do not control when items are consumed.

Presser answered 15/12, 2017 at 2:38 Comment(2)
Curious if that implementation is somewhere on github/SO and you ould add a link for reference? Thanks!Mandorla
Did you come to either the concurrency-interest or the OpenJDK core-libs-dev mailing list with this? I'd gladly read the discussion.Footrace

© 2022 - 2024 — McMap. All rights reserved.