Minimal mutexes for std::queue producer/consumer
Asked Answered
S

1

3

I have two threads that work the producer and consumer sides of a std::queue. The queue isn't often full, so I'd like to avoid the consumer grabbing the mutex that is guarding mutating the queue.

Is it okay to call empty() outside the mutex then only grab the mutex if there is something in the queue?

For example:

struct MyData{
   int a;
   int b;
};

class SpeedyAccess{
public:
   void AddDataFromThread1(MyData data){
      const std::lock_guard<std::mutex> queueMutexLock(queueAccess);
      workQueue.push(data);
   }

   void CheckFromThread2(){
      if(!workQueue.empty()) // Un-protected access...is this dangerous?
      {
         queueAccess.lock();
         MyData data = workQueue.front();
         workQueue.pop();
         queueAccess.unlock();

         ExpensiveComputation(data);
       }

   }

private:
   void ExpensiveComputation(MyData& data);

   std::queue<MyData> workQueue;
   std::mutex queueAccess;
}

Thread 2 does the check and isn't particularly time-critical, but will get called a lot (500/sec?). Thread 1 is very time critical, a lot of stuff needs to run there, but isn't called as frequently (max 20/sec).

If I add a mutex guard around empty(), if the queue is empty when thread 2 comes, it won't hold the mutex for long, so might not be a big hit. However, since it gets called so frequently, it might occasionally happen at the same time something is trying to get put on the back....will this cause a substantial amount of waiting in thread 1?

Sinecure answered 16/5, 2022 at 14:10 Comment(3)
No it's not okay to do unprotected access to a variable that may be modified by another thread at the same time. You need to protect the empty() call for the same reason you protect the front() call.Til
You can use a std::condition_variable to allow the producer to notify the consumer when new data is available in the Q. This way the consumer will not have to poll the Q endlessly.Abercrombie
The rule is: if you have an object that is shared with multiple threads, and at least one of them is a writer, then all access needs synchronization. To not do so introduces a data race which has undefined behavior.Isobath
A
6

As written in the comments above, you should call empty() only under a lock.

But I believe there is a better way to do it.
You can use a std::condition_variable together with a std::mutex, to achieve synchronization of access to the queue, without locking the mutex more than you must.

However - when using std::condition_variable, you must be aware that it suffers from spurious wakeups. You can read about it here: Spurious wakeup - Wikipedia.
You can see some code examples here: Condition variable examples.

The correct way to use a std::condition_variable is demonstrated below (with some comments). This is just a minimal example to show the principle.

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <iostream>

using MyData = int;

std::mutex mtx;
std::condition_variable cond_var;
std::queue<MyData> q;

void producer()
{
    MyData produced_val = 0;
    while (true)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));   // simulate some pause between productions
        ++produced_val;
        std::cout << "produced: " << produced_val << std::endl;
        {
            // Access the Q under the lock:
            std::unique_lock<std::mutex> lck(mtx);
            q.push(produced_val);
            cond_var.notify_all();  // It's not a must to nofity under the lock but it might be more efficient (see @DavidSchwartz's comment below).
        }
    }
}

void consumer()
{
    while (true)
    {
        MyData consumed_val;
        {
            // Access the Q under the lock:
            std::unique_lock<std::mutex> lck(mtx);
            // NOTE: The following call will lock the mutex only when the the condition_varible will cause wakeup
            //       (due to `notify` or spurious wakeup).
            //       Then it will check if the Q is empty.
            //       If empty it will release the lock and continue to wait. 
            //       If not empty, the lock will be kept until out of scope.
            //       See the documentation for std::condition_variable.
            cond_var.wait(lck, []() { return !q.empty(); }); // will loop internally to handle spurious wakeups
            consumed_val = q.front();
            q.pop();
        }
        std::cout << "consumed: " << consumed_val << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(200));    // simulate some calculation
    }
}

int main()
{
    std::thread p(producer);
    std::thread c(consumer);
    while(true) {}
    p.join(); c.join(); // will never happen in our case but to remind us what is needed.
    return 0;
}

Some notes:

  1. In your real code, none of the threads should run forever. You should have some mechanism to notify them to gracefully exit.
  2. The global variables (mtx,q etc.) are better to be members of some context class, or passed to the producer() and consumer() as parameters.
  3. This example assumes for simplicity that the producer's production rate is always low relatively to the consumer's rate. In your real code you can make it more general, by making the consumer extract all elements in the Q each time the condition_variable is signaled.
  4. You can "play" with the sleep_for times for the producer and consumer to test varios timing cases.
  5. For more complex data types that benefit from move semantics, you can change:
    consumed_val = q.front();
    
    to:
    consumed_val = std::move(q.front());
    
    for better efficiency (since the front element in the queue is poped right afterwards anyway).
  6. The code above assumes that MyData is default constructable.
    If this is not the case, you can use an immediatly invoked lambda in the loop body in the consumer:
    MyData consumed_val = 
         [](){ std::unique_lock<std::mutex> lck(mtx);
               cond_var.wait(lck, []() { return !q.empty(); }); 
               MyData val = std::move(q.front()); 
               q.pop(); 
               return val; }
         ();   // <-- immediatly invoke the lambda to initialize consumed_val 
    
    The usage of std::move together with NRVO should make it efficient.
Abercrombie answered 16/5, 2022 at 15:10 Comment(13)
You gratuitously make the notify_all more expensive by unlocking the mutex before calling it. If you call notify_all while holding the mutex, most modern implementations know that it cannot make any thread ready-to-run (because they need to acquire the mutex to make forward progress) and can make wait morphing optimizations. The way this is coded, both notify_all and the destructor of lck can make a thread ready-to-run, resulting in reduced performance.Matthias
@DavidSchwartz good point. I'll change it like you suggested.Abercrombie
What if MyData cannot be constructed by default or the developer wants to have const qualifier in the declaration?Fulmis
@CaglayanDOKME where exactly do you see the problemwith MyData without a default constructor ? In this line consumed_val = q.front(); it is copied, not default constructed. And the producer can also work without default constructing. Or did I misunderstand your meaning ?Abercrombie
@Abercrombie MyData consumed_val; constructs by default. We could have a class that cannot be constructed by this way. Consider that MyData::MyData() = delete;Fulmis
@CaglayanDOKME you are right. I wrote this answer a long time ago and missed that now. What about using an immediatly invoked lambda like you attempted in your question here ? I mean something like MyData consumed_val = [...](...) { /* lambda that locks the mutex, waits, and return the front of the queue after poping*/ } ().Abercrombie
@Abercrombie Somehow the StackOverflow folk didn't like it :) I guess the best way is to use this. std::unique_lock<std::mutex> lock(m_mutex); m_notifier.wait(lock, [this] { return !m_queue.empty(); }); const auto rxEntry = std::move(m_queue.front()); m_queue.pop(); lock.unlock();Fulmis
@CaglayanDOKME I was thinking about simply wrapping the whole logic in the lambda (and immediatly invoking it). In the case presented in my answer it would be: MyData consumed_val = [](){ std::unique_lock<std::mutex> lck(mtx); cond_var.wait(lck, []() { return !q.empty(); }); MyData val = q.front(); q.pop(); return val; }();.Abercrombie
@Abercrombie That would cause copying the data twice. In my solution, I wasn't even copying it. Stealing the resources that will be destroyed during pop is way more efficient.Fulmis
@CaglayanDOKME my note number 5 in the answer (added earlier today) is about using move semantics, which can be applied with the lambda as well.Abercrombie
@Abercrombie You were right and I was wrong! When we use a lambda function, NRVO happens and the returned value isn't actually copied twice. So, using a lambda with std::move is the best solution in this case. Thanks for the discussionFulmis
@Abercrombie Your answer still has drawbacks. I can upvote it if you use lambda + std::moveFulmis
@CaglayanDOKME added a note about handling the case where MyData is not default constructable (with a lambda and std::move). I added it as a note and didn't replace the main code in the answer because it fits better with the original question here.Abercrombie

© 2022 - 2024 — McMap. All rights reserved.