std::condition_variable - Wait for several threads to notify observer
Asked Answered
S

1

0

my problem looks like this:

I've got a observer which holds a std::condition_variable and a std::mutex, my worker thread objects have a pointer to the observer. Each time a worker thread finished its job it calls m_Observer->NotifyOne() which then calls the notify_one() function of the condition_variable. Now what i want to do is, start a bunch of worker threads each with a different job and different (independant) data and wait for all of them to signal (using m_Observer->NotifyOne()) the observer so that I'm able to continue work based on the results of all the worker threads.

My observer looks like this:

class IAsyncObserver
{
private:
    std::condition_variable m_ObserverCV;
    bool m_bNotified;
    std::mutex m_Mutex;

public:
    IAsyncObserver()
    {
        m_bNotified = false;
    }

    ~IAsyncObserver()
    {
        /*m_bNotified = true;
        m_ObserverCV.notify_all();*/
    }

    void NotifyOne()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_one();
    }

    void NotifyAll()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_all();
    }

    void WaitForNotifications(uint32_t _uNumOfNotifications = 1)
    {
        uint32_t uNotifyCount = 0;
        while (uNotifyCount < _uNumOfNotifications)
        {
            std::unique_lock<std::mutex> Lock(m_Mutex);
            m_bNotified = false;
            m_ObserverCV.wait(Lock);

            if (m_bNotified)
                ++uNotifyCount;
        }
    }

}; // IAsyncObserver

where _uNumOfNotifications is the number of worker threads i want to wait on.

Now each worker thread is supposed to run a simulation function which does the actual work for one timestep/data junk and then pause / wait until the observer notifies the worker to continue.

The thread function of a worker might look like this:

do{
    //suspend simulation
    while (m_PauseSimulation.load())
    {
        std::unique_lock<std::mutex> wait(m_WaitMutex);
        m_CV.wait(wait);
        if (m_RunSimulation.load() == false)
        {
            SignalObserver();
            return;
        }
    }

    //lock the data while simulating
    {
        std::lock_guard<std::mutex> lock(m_LockMutex);

        //update simulation 
        Simulate(static_cast<float>(m_fDeltaTime));

        m_PauseSimulation.store(true);
    }

    //notify the physics manager that work is done here
    SignalObserver();       

} while (m_RunSimulation.load());

SignalObserver() just calls m_Observer->NotifyOne().

Now the problem is that after some time the threads run into a deadlock somewhere and the observer does not notify them to continue with the next time step. The problem probably is somewhere in the WaitForNotifications() function, but I'm not sure. Atm I only have one worker thread so uNumOfNotifications = 1, but it still runs into the problem where it waits at m_ObserverCV.wait(Lock) and m_CV.wait(wait), I'm not even sure if its really a deadlock or something with the condition_variable because i try to access it from several threads.

At this point I'd like to quote Ned Flanders father: "We tried nothing and are all out of ideas!"

Thanks for your help. Ever tip is appreciated.

Fabian

EDIT:

Thanks for all the helpful info & suggestions. I ended up implementing the second idea of Michael since i didnt find anything about std::barriers. So here is what i did:

class IAsyncObserver
{
private:
    std::condition_variable m_ObserverCV;
    bool m_bNotified;
    std::mutex m_Mutex;

    uint32_t m_uNumOfNotifications;
    uint32_t m_uNotificationCount;

public:
    IAsyncObserver()
    {
        m_bNotified = false;
        m_uNumOfNotifications = 0;
        m_uNotificationCount = 0;
    }

    ~IAsyncObserver()
    {
        /*m_bNotified = true;
        m_ObserverCV.notify_all();*/
    }

    void SetBarrier(uint32_t _uNumOfNotifications = 1)
    {
        m_uNumOfNotifications = _uNumOfNotifications;
    }

    void NotifyBarrier()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        if (++m_uNotificationCount >= m_uNumOfNotifications)
        {
            m_bNotified = true;
            m_ObserverCV.notify_one();
        }
    }

    void WaitForNotifications()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        while (m_bNotified == false)
        {
            m_ObserverCV.wait(Lock);
        }
        m_uNotificationCount = 0;
    }

    void NotifyOne()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_one();
    }

    void NotifyAll()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_all();
    }

}; // IAsyncObserver

In my "main" function: where MassSpringSystem and RigidBodySystem are my workers atm

    //update systems here:
    {
        SetBarrier(m_uTotalNotifyCount);

        {   //start MassSpringSystems
            std::lock_guard<std::mutex> lock(m_LockMutex);
            for (std::shared_ptr<MassSpringSystem> MSS : m_MassSpringSystems)
            {
                MSS->SetDeltaTime(fDeltaTime);
                MSS->Continue();
            }
        }

        //ATTENTION this system works directly on the m_OctreeEntities!
        {   //start RigidBodySystems
            m_RigidBodySystem.SetDeltaTime(fDeltaTime);
            m_RigidBodySystem.AddData(m_RigidBodies);
            m_RigidBodySystem.Continue();
        }

        //wait for all systems to finish -> till they call SignalObserver
        WaitForNotifications();
    }

And in the thread function of the workers just like above, but this time SignalObserver calls NotifyBarrier()

Everything works fine now. A simple, yet powerful solution, Thanks!

Salenasalene answered 30/11, 2014 at 21:42 Comment(3)
There are probably more problems, but you do not need a mutex to protect your conditional variable. They are thread safe.Owens
Your observer wait loop looks odd. You lock the mutex then hard set notified state to false before waiting on the observer CV. Unless that state is set initially before workers commence you're entering a race to see who can lock to mutex and hard-set the state first. I.e. (1) worker locks mutex, (2) worker sets state to true via NotifyOne, (3) observer enters wait-loop and waits on mutex, (4) worker releases mutex, (5) observer obtains mutex and wipes the notified state to false, (6) observer begins waiting for a predicate change.Foolish
Also, if you want more specific help, please provide a minimum working program which reproduces the behavior you observe. Without being able to reproduce the problem, we can only speculate based on observations of the code.Owens
P
2

You try to use condition variables in a way they are not meant to be used - in this case, you assume that you can count notifications. You can't. You may lose notifications by that, and you are counting spurious wake-ups that are allowed by the standard.

Instead, you should use a counter incremented under a mutex and signalling the condition variable only when the counter reached the number of workers. (Do this in each worker at the end). The main thread keeps sleeping on the condition variable until the counter reaches the expected value. (Of course, verification of the counter has to be done holding the mutex you use for incrementing, too). As far as I can see, replacing the mutexed counter by an atomic (without mutexing it) seems impossible, as you can't atomically check the counter and sleep on the condvar, so you will get a race condition without mutexing the counter.

Another synchronization primitive known from boost threads is the barrier, which did not get into C++11. You construct a barrier, and pass it the number of worker threads plus one as constructor argument. All worker threads should wait for the condition variable at their end and the main thread should wait after constructing the workers. All threads will block on that barrier, until all worker threads and the main thread are blocking, and will be released at that moment. So if the main thread is released, you know that all workers finished. This has one problem though: No worker thread is finished (and freeing associated management resources) until all worker threads are finshed, which may or may not be a problem for you. This question presents an implementation of boost::barrier using C++11 threading facilities.

Potentate answered 30/11, 2014 at 23:1 Comment(3)
Thanks for your answer, sadly I was unable to find any reference to std::barrier, is it possible that C++11 does not implement this yet?Salenasalene
I fixed my answer. I currently work on a project that uses boost.threads, which has boost::barrier, and expected this part to be in C++11, too. I was wrong.Potentate
There's a proposal to add latches and barriers to the Concurrency TS, and so they might be in C++17Tenstrike

© 2022 - 2024 — McMap. All rights reserved.