java BlockingQueue does not have a blocking peek?
Asked Answered
P

8

41

I have a blocking queue of objects.

I want to write a thread that blocks till there is a object on the queue. Similar to the functionality provided by BlockingQueue.take().

However, since I do not know if I will be able to process the object successfully, I want to just peek() and not remove the object. I want to remove the object only if I am able to process it successfully.

So, I would like a blocking peek() function. Currently, peek() just returns if the queue is empty as per the javadocs.

Am I missing something? Is there another way to achieve this functionality?

EDIT:

Any thoughts on if I just used a thread safe queue and peeked and slept instead?

public void run() {
    while (!exit) {
        while (queue.size() != 0) {
            Object o =  queue.peek();
            if (o != null) {
                if (consume(o) == true) {
                    queue.remove();
                } else {
                    Thread.sleep(10000); //need to backoff (60s) and try again
                }
            }
        }
        Thread.sleep(1000); //wait 1s for object on queue
    }
}

Note that I only have one consumer thread and one (separate) producer thread. I guess this isn't as efficient as using a BlockingQueue... Any comments appreciated.

Pruter answered 18/11, 2009 at 23:11 Comment(0)
W
16

You could use a LinkedBlockingDeque and physically remove the item from the queue (using takeLast()) but replace it again at the end of the queue if processing fails using putLast(E e). Meanwhile your "producers" would add elements to the front of the queue using putFirst(E e).

You could always encapsulate this behaviour within your own Queue implementation and provide a blockingPeek() method that performs takeLast() followed by putLast() behind the scenes on the underlying LinkedBlockingDeque. Hence from the calling client's perspective the element is never removed from your queue.

Ware answered 18/11, 2009 at 23:26 Comment(4)
This is a good suggestion. The only issue I can see here is that if the queue fills up while I am processing an item, then I will not be able to queue the current item back.Pruter
You could get round this by using additional synchronization in your wrapper implementation, therefore making the take + put an atomic operation. You could also use an unbounded queue.Ware
I would recommend against removing and re-adding because then you have the trouble of exposing the queue's state changes to other threads. Maybe use a busy polling for implementing the peek(). Or use a semaphore tied to the queue in your wrapper if you do not want to poll.Tonguelashing
@Ustaman: Yes I agree. Anyone encapsulating this behavior behind their own Queue implementation would have to handle this. It's not a great solution because you'd probably require a global lock in addition to the ReentrantLock defined in LinkedBlockingDeque (given that this is package-private). In the OP's position I would be tempted to see whether this "peek" behavior can be avoided completely by reworking the design.Ware
K
6

However, since I do not know if I will be able to process the object successfully, I want to just peek() and not remove the object. I want to remove the object only if I am able to process it successfully.

In general, it is not thread-safe. What if, after you peek() and determine that the object can be processed successfully, but before you take() it to remove and process, another thread takes that object?

Knotweed answered 18/11, 2009 at 23:39 Comment(1)
IMHO you can solve this issue by wrapping and putting some sync. logicEwan
S
2

Could you also just add an event listener queue to your blocking queue, then when something is added to the (blocking)queue, send an event off to your listeners? You could have your thread block until it's actionPerformed method was called.

Scutt answered 18/11, 2009 at 23:17 Comment(0)
A
2

The only thing I'm aware of that does this is BlockingBuffer in Apache Commons Collections:

If either get or remove is called on an empty Buffer, the calling thread waits for notification that an add or addAll operation has completed.

get() is equivalent to peek(), and a Buffer can be made to act like BlockingQueue by decorating a UnboundedFifoBuffer with a BlockingBuffer

Ascension answered 18/11, 2009 at 23:18 Comment(0)
W
2

Not an answer per se, but: JDK-6653412 claims this is not a valid use case.

Woermer answered 14/9, 2018 at 16:38 Comment(1)
Could you add a bit more information from the ressource here? That would make it an acceptable answer, IMO.Micahmicawber
E
1

The quick answer is, not there's not really a way have a blocking peek, bar implementing a blocking queue with a blocking peek() yourself.

Am I missing something?

peek() can be troublesome with concurrency -

  • If you can't process your peek()'d message - it'll be left in the queue, unless you have multiple consumers.
  • Who is going to get that object out of the queue if you can't process it ?
  • If you have multiple consumers, you get a race condition between you peek()'ing and another thread also processing items, resulting in duplicate processing or worse.

Sounds like you might be better off actually removing the item and process it using a Chain-of-responsibility pattern

Edit: re: your last example: If you have only 1 consumer, you will never get rid of the object on the queue - unless it's updated in the mean time - in which case you'd better be very very careful about thread safety and probably shouldn't have put the item in the queue anyway.

Elviraelvis answered 18/11, 2009 at 23:36 Comment(0)
F
0

Looks like BlockingQueue itself doesn't have the functionality you're specifying.

I might try to re-frame the problem a little though: what would you do with objects you can't "process correctly"? If you're just leaving them in the queue, you'll have to pull them out at some point and deal with them. I'd reccommend either figuring out how to process them (commonly, if a queue.get() gives any sort of invalid or bad value, you're probably OK to just drop it on the floor) or choosing a different data structure than a FIFO.

Fairlie answered 18/11, 2009 at 23:25 Comment(0)
Y
0

The 'simplest' solution

Do not process the next element until the previous element is processed succesfully.

public void run() {

Object lastSuccessfullyProcessedElement = null;

    while (!exit) {
        Object obj =  lastSuccessfullyProcessedElement == null ? queue.take() : lastSuccessfullyProcessedElement; // blocking
        
        boolean successful = process(obj);
        
        if(!successful) {
            lastSuccessfullyProcessedElement = obj;
        } else {
            lastSuccessfullyProcessedElement = null;
        }
    }
}
  1. Calling peek() and checking if the value is null is not CPU efficient.

I have seen CPU usage going to 10% on my system when the queue is empty for the following program.

while (true) {
   Object o = queue.peek();
   if(o == null) continue;
   // omitted for the sake of brevity
}
  1. Adding sleep() adds slowness.

  2. Adding it back to the queue using putLast will disturb the order. Moreover, it is a blocking operation which requires locks.

Yalonda answered 11/9, 2019 at 13:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.