my problem looks like this:
I've got a observer which holds a std::condition_variable and a std::mutex, my worker thread objects have a pointer to the observer. Each time a worker thread finished its job it calls m_Observer->NotifyOne() which then calls the notify_one() function of the condition_variable. Now what i want to do is, start a bunch of worker threads each with a different job and different (independant) data and wait for all of them to signal (using m_Observer->NotifyOne()) the observer so that I'm able to continue work based on the results of all the worker threads.
My observer looks like this:
class IAsyncObserver
{
private:
std::condition_variable m_ObserverCV;
bool m_bNotified;
std::mutex m_Mutex;
public:
IAsyncObserver()
{
m_bNotified = false;
}
~IAsyncObserver()
{
/*m_bNotified = true;
m_ObserverCV.notify_all();*/
}
void NotifyOne()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_one();
}
void NotifyAll()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_all();
}
void WaitForNotifications(uint32_t _uNumOfNotifications = 1)
{
uint32_t uNotifyCount = 0;
while (uNotifyCount < _uNumOfNotifications)
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = false;
m_ObserverCV.wait(Lock);
if (m_bNotified)
++uNotifyCount;
}
}
}; // IAsyncObserver
where _uNumOfNotifications is the number of worker threads i want to wait on.
Now each worker thread is supposed to run a simulation function which does the actual work for one timestep/data junk and then pause / wait until the observer notifies the worker to continue.
The thread function of a worker might look like this:
do{
//suspend simulation
while (m_PauseSimulation.load())
{
std::unique_lock<std::mutex> wait(m_WaitMutex);
m_CV.wait(wait);
if (m_RunSimulation.load() == false)
{
SignalObserver();
return;
}
}
//lock the data while simulating
{
std::lock_guard<std::mutex> lock(m_LockMutex);
//update simulation
Simulate(static_cast<float>(m_fDeltaTime));
m_PauseSimulation.store(true);
}
//notify the physics manager that work is done here
SignalObserver();
} while (m_RunSimulation.load());
SignalObserver() just calls m_Observer->NotifyOne().
Now the problem is that after some time the threads run into a deadlock somewhere and the observer does not notify them to continue with the next time step. The problem probably is somewhere in the WaitForNotifications() function, but I'm not sure. Atm I only have one worker thread so uNumOfNotifications = 1, but it still runs into the problem where it waits at m_ObserverCV.wait(Lock) and m_CV.wait(wait), I'm not even sure if its really a deadlock or something with the condition_variable because i try to access it from several threads.
At this point I'd like to quote Ned Flanders father: "We tried nothing and are all out of ideas!"
Thanks for your help. Ever tip is appreciated.
Fabian
EDIT:
Thanks for all the helpful info & suggestions. I ended up implementing the second idea of Michael since i didnt find anything about std::barriers. So here is what i did:
class IAsyncObserver
{
private:
std::condition_variable m_ObserverCV;
bool m_bNotified;
std::mutex m_Mutex;
uint32_t m_uNumOfNotifications;
uint32_t m_uNotificationCount;
public:
IAsyncObserver()
{
m_bNotified = false;
m_uNumOfNotifications = 0;
m_uNotificationCount = 0;
}
~IAsyncObserver()
{
/*m_bNotified = true;
m_ObserverCV.notify_all();*/
}
void SetBarrier(uint32_t _uNumOfNotifications = 1)
{
m_uNumOfNotifications = _uNumOfNotifications;
}
void NotifyBarrier()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
if (++m_uNotificationCount >= m_uNumOfNotifications)
{
m_bNotified = true;
m_ObserverCV.notify_one();
}
}
void WaitForNotifications()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
while (m_bNotified == false)
{
m_ObserverCV.wait(Lock);
}
m_uNotificationCount = 0;
}
void NotifyOne()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_one();
}
void NotifyAll()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_all();
}
}; // IAsyncObserver
In my "main" function: where MassSpringSystem and RigidBodySystem are my workers atm
//update systems here:
{
SetBarrier(m_uTotalNotifyCount);
{ //start MassSpringSystems
std::lock_guard<std::mutex> lock(m_LockMutex);
for (std::shared_ptr<MassSpringSystem> MSS : m_MassSpringSystems)
{
MSS->SetDeltaTime(fDeltaTime);
MSS->Continue();
}
}
//ATTENTION this system works directly on the m_OctreeEntities!
{ //start RigidBodySystems
m_RigidBodySystem.SetDeltaTime(fDeltaTime);
m_RigidBodySystem.AddData(m_RigidBodies);
m_RigidBodySystem.Continue();
}
//wait for all systems to finish -> till they call SignalObserver
WaitForNotifications();
}
And in the thread function of the workers just like above, but this time SignalObserver calls NotifyBarrier()
Everything works fine now. A simple, yet powerful solution, Thanks!