Using std::mutex, std::condition_variable and std::unique_lock
Asked Answered
M

2

15

I'm having some trouble understanding condition variables and their use with mutexes, I hope the community can help me with. Please note, I come from a win32 background, so I'm used with CRITICAL_SECTION, HANDLE, SetEvent, WaitForMultipleObject, etc.

Here's my first attempt at concurrency using the c++11 standard library, it's a modified version of a program example found here.

#include <condition_variable>
#include <mutex>
#include <algorithm>
#include <thread>
#include <queue>
#include <chrono>
#include <iostream>


int _tmain(int argc, _TCHAR* argv[])
{   
    std::queue<unsigned int>    nNumbers;

    std::mutex                  mtxQueue;
    std::condition_variable     cvQueue;
    bool                        m_bQueueLocked = false;

    std::mutex                  mtxQuit;
    std::condition_variable     cvQuit;
    bool                        m_bQuit = false;


    std::thread thrQuit(
        [&]()
        {
            using namespace std;            

            this_thread::sleep_for(chrono::seconds(7));

            // set event by setting the bool variable to true
            // then notifying via the condition variable
            m_bQuit = true;
            cvQuit.notify_all();
        }
    );

    std::thread thrProducer(
        [&]()
        {           
            using namespace std;

            int nNum = 0;
            unique_lock<mutex> lock( mtxQuit );

            while( ( ! m_bQuit ) && 
                   ( cvQuit.wait_for( lock, chrono::milliseconds(10) ) == cv_status::timeout ) )
            {
                nNum ++;

                unique_lock<mutex> qLock(mtxQueue);
                cout << "Produced: " << nNum << "\n";
                nNumbers.push( nNum );              
            }
        }
    );

    std::thread thrConsumer(
        [&]()
        {
            using namespace std;            

            unique_lock<mutex> lock( mtxQuit );

            while( ( ! m_bQuit ) && 
                    ( cvQuit.wait_for( lock, chrono::milliseconds(10) ) == cv_status::timeout ) )
            {
                unique_lock<mutex> qLock(mtxQueue);
                if( nNumbers.size() > 0 )
                {
                    cout << "Consumed: " << nNumbers.front() << "\n";
                    nNumbers.pop();
                }               
            }
        }
    );

    thrQuit.join();
    thrProducer.join();
    thrConsumer.join();

    return 0;
}

A Few questions about this.

I've read that "any thread that intends to wait on std::condition_variable must acquire an std::unique_lock first."

So I've got a {quit mutex, condition variable & bool} to indicate when quit has been signalled. The producer and consumer threads must each acquire an std::unique_lock as so:

std::unique_lock<std::mutex> lock(m_mtxQuit);

This is confusing the hell out of me. Won't this lock the quit mutex in the first thread, thereby blocking the second? And if that's true, then how does the first thread release the lock, so that the other thread can begin?

Another question: If I change the wait_for() call to wait for zero seconds, that thread is starved. Can someone explain? I'd expect it not to block before executing the while loop (am I correct to assume that a no_timeout is recv'd instead of a timeout?).

How can I call a wait_for() and specify a zero time, so that the wait_for() call doesn't block, instead it just checks the condition and continues?

I'd also be interested to hear about good references on this subject.

Marilou answered 14/11, 2012 at 0:3 Comment(0)
R
13

Won't this lock the quit mutex in the first thread, thereby blocking the second?

Yes.

And if that's true, then how does the first thread release the lock, so that the other thread can begin?

When you wait on a condition_variable it unlocks the lock that you pass it, so in

cvQuit.wait_for( lock, chrono::milliseconds(10) )

the condition variable will call lock.unlock() and then block for up to 10ms (this happens atomically so there's no window between unlocking the mutex and blocking where the condition could become ready and you'd miss it)

When the mutex is unlocked it allows the other thread to acquire the lock on it.

Another question: If I change the wait_for() call to wait for zero seconds, that thread is starved. Can someone explain?

I would expect the other thread to be starved, because the mutex is not unlocked long enough for the other thread to lock it.

am I correct to assume that a no_timeout is recv'd instead of a timeout?

No, if the time duration passes without the condition becoming ready then it "times out" even after zero seconds.

How can I call a wait_for() and specify a zero time, so that the wait_for() call doesn't block, instead it just checks the condition and continues?

Don't use a condition variable! If you don't want to wait for a condition to become true, don't wait on a condition variable! Just test m_bQuit and proceed. (Aside, why are your booleans called m_bXxx? They're not members, so the m_ prefix is misleading, and the b prefix looks like that awful MS habit of Hungarian notation ... which stinks.)

I'd also be interested to hear about good references on this subject.

The best reference is Anthony Williams's C++ Concurrency In Action which covers the entire C++11 atomics and thread libraries in detail, as well as the general principles of multithreading programming. One of my favourite books on the subject is Butenhof's Programming with POSIX Threads, which is specific to Pthreads, but the C++11 facilities map very closely to Pthreads, so it's easy to transfer the information from that book to C++11 multithreading.

N.B. In thrQuit you write to m_bQuit without protecting it with a mutex, since nothing prevents another thread reading it at the same time as that write, it's a race condition, i.e. undefined behaviour. The write to the bool must either be protected by a mutex or must be an atomic type, e.g. std::atomic<bool>

I don't think you need two mutexes, it just adds contention. Since you never release the mtxQuit except while waiting on the condition_variable there is no point having the second mutex, the mtxQuit one already ensures only one thread can enter the critical section at once.

Reductive answered 14/11, 2012 at 0:53 Comment(1)
thanks for that, it's really cleared up my questions. I was looking at the Anthony Williams book, but wondered if there were more. Great answer, thanks!Marilou
G
2

If you want to check something and continue regardless of whether its true or not (possibly doing two different things), then a condition variable is the wrong thing to use. A condition variable is a low level primitive for some condition associated with a locked data structure that you want to wait for without having to spin acquiring and releasing the lock. The canonical example being a queue -- you have a lock guarding access to the queue and two condition vars (queue not empty and queue not full). To push something on the queue, you acquire the lock, check that its not full, wait on the not-full condvar if it is, push the value on the queue, signal the not-empty condvar (since its no longer empty) and release the lock. The pop operation is similar.

So in your case, you have a simple queue that can't be full, so you need one lock and one condvar for it. Makes perfect sense. But then you have a 'quit' flag that you want to have trigger completion. You don't want to wait for the quit flag to be set -- you want to actually do work until it is set -- so a condvar really makes no sense here. Yes, you COULD come up with a convoluted arrangement that would make it work, but that would be confusing as its not using a condition variable as a condition variable.

It make more sense (and is clearer) to just use a std::atomic<bool> for the quit flag. Then you just initialize it to false, set to to true in your quit thread, and check it in the other threads.

Githens answered 14/11, 2012 at 0:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.