Java: Thread producer consumer what is the most efficient way to wait for data to be produced
Asked Answered
P

2

6

When using BlockingQueue to consume data that is produced what is the most efficient method for waiting for the data to appear?

Scenario:

Step 1) The data list will be a data store where timestamps are added to. These timestamps are required to be ordered by closest to current time priority. This list may be empty. A thread will be inserting the timestamps into it. Produce

Step 2) I want to consume the data in here in another thread that will take the timestamps from data and check if they are after the current time. Consumer then Produce

Step 3) If they are after the current time then send them on to another thread to be consumed and processed. Once the time stamp data is processed here, remove from the Step 1 data store. Consume then edit the original list.

In the below code the data field refers to the data store in step 1. The results is the list of timestamps that have been sent that are after the current time. Step 2. The results will then be consumed step 3.

private BlockingQueue<LocalTime> data;
private final LinkedBlockingQueue<Result> results = new LinkedBlockingQueue<Result>();

@Override
public void run() {
  while (!data.isEmpty()) {
    for (LocalTime dataTime : data) {
      if (new LocalTime().isAfter(dataTime)) {
        results.put(result);
      }
    }
  }
}

Questions What is the most efficient way to wait for data to be added in the data list that could be potentially empty? Focus on:

while (!data.isEmpty())

Following from an earlier question.

Perspex answered 24/7, 2015 at 11:6 Comment(11)
Use data.take()!=null. The isEmpty call is non-blocking.Separatrix
Yeah, well, what is the point of using a blocking queue if you are going to poll it?Furriery
Your code says !data.isEmpty() which means you are looping till there is some data in your data list (waiting for data to be consumed). But your requirement says - "wait for data to be added in the data list that could be potentially empty." What is your requirement here?Pavior
@hagrawal I have added more contextPerspex
So you mean you want to wait for data to be produced from results.put(result); ?? I think this would be the very initial case. What will happen once some data is produced. As per step3, you want it to be consumed by some thread, and lets say 1 data is produced, and it is consumed, now again same thread would be waiting for next item to be produced? Key thing is - is it the same thread or different thread which would wait for data to be produced from results.put(result); OR thread will keep on consuming, but if data is not there then it should wait? Single or multi thread part is not clear.Pavior
@hagrawal it would be a different thread.Perspex
Ok, I see, I just edited my question above.Pavior
@hagrawal. There are three threads, one that produces in the example code the field data. Another thread that is working on taking the timestamps from data and then putting them on results this code. Then another thread that consumes the results queue and does processing to the modified data in the results.Perspex
@hagrawal If the data is not there it should wait for it.Perspex
This part is clear, what is not clear is whether you want only 3 threads doing all these jobs in a loop or it would be different thread. More specifically in the last case - whether it is the same thread or a new thread each time which would consume the data and wait for data to be produced from results.put(result); OR same single thread will keep on consuming and if data is not there then it should wait?Pavior
@hagrawal "OR same single thread will keep on consuming and if data is not there then it should wait" is how i imagined it.Perspex
E
5

what is the most efficient way to wait for data to be produced

The BlockingQueue has blocking functionality which will suspend threads waiting for a queue to be not empty or not full. In your case you are spinning on the queue which consume CPU. This is not preferred.

You should use take.

Suspending retrieves and removes the head of this queue, waiting if necessary until an element becomes available.

BlockingQueue#take

This will be the most efficient way to wait for elements from a queue as suspending threads consume no cpu. Once a new item is put on the queue the waiting threads will be woken.

Then you can use put which has the same waiting semantics as take but only if the queue is not full.

public void run(){
   LocalTime timestamp = null;
   while((timestamp = data.take()) != null){
      ...
   }
}

Updating based on our comments:

But in this case the timestamps are created in a sequential order and added. But a timestamp may be less in the future. E.g. Head node is 2 mins in future, Second node is 1 min, so the second node wants processing first

Then my follow up:

So you need priority queuing based on the timestamp of the LocalDate?

Not sure if you are using the LocalDate from JodaTime or Java 8, let's assume the latter.

You can use a PriorityBlockingQueue with the same blocking semantics. However, the priority aspect of a BlockingQueue will enqueue elements based on whatever order is defined. In your case, using the LocalDate you can have elements ordered from oldest-to-youngest or youngest-to-oldest.

BlockingQueue<LocalDate> data = new PriorityBlockingQueue<>(); 

OR INVERSE THE ORDER

BlockingQueue<LocalDate> data = new PriorityBlockingQueue<>(0, (a,b) -> b.compareTo(a));

In this case, you will process LocalDate in their natural order and not the order in which they are enqueued.

If you are using JodaTime's LocalDate you may need to implement your own Comparator similar to my second example.

EDIT: just realized you had this tagged as java-7. So you will use the JodaTime and if the JodaTime LocalDate does not implement Comparable, just create your own.

Eardrum answered 24/7, 2015 at 13:16 Comment(6)
Thanks I think in my case now looking at it using data.peek() is more appropriate as the first node is not necessarily the one that wants to be processed.Perspex
Just curious, why wouldn't the first node be processed?Eardrum
I do think this will most likely be accepted in a general sense. But in this case the timestamps are created in a sequential order and added. But a timestamp may be less in the future. E.g. Head node is 2 mins in future, Second node is 1 min, so the second node wants processing first.Perspex
@Stuart So you need priority queuing based on the timestamp of the LocalDate?Eardrum
Thank you for being proactive and asking questions. Really good solution.Perspex
@Stuart You may be aware, and if not then - under the hoods in take() for all the implementations of BlockingQueue, wait is achieved through java.util.concurrent.locks.Condition. This allows to wait based on some condition, and in case of take() it waits if there is no item in the list. --- notEmpty.await(); where notEmpty is private final Condition notEmpty = takeLock.newCondition();. So, basically if you want to wait on some condition in situations like this then you can consider using java.util.concurrent.locks.ConditionPavior
P
0

Firt you need to use the take method. This method will block while the queue is empty. THis will replace your check to see if the queue is empty.

Second, why do you need a timestamp? If the timestamp is to make sure you process the requests int he order that they were put into the queue then you do not need it, as the queue is FIFO and made for concurrent multithreaded environment. If the timestamp is from outside of the system, some external timestamp, where the requests might come out of order but then need to be processed in order, then this BlockQueue will not cut it. You might need a PriorityBlockingQueue where you will prioritize the requests by timestamp. So either get rid of the timestamp or use a PriorityBlockingQueue.

Proptosis answered 24/7, 2015 at 14:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.