- Accesses to shared variables should only be done while holding the
mutex that protects it
condition_variable::wait
should check a condition.
- The condition should be a shared variable protected by the mutex that you pass to
condition_variable::wait
.
- The way to check the condition is to wrap the call to
wait
in a while loop or use the 2-argument overload of wait
(which is
equivalent to the while-loop version)
Note: These rules aren't strictly necessary if you truly understand what the hardware is doing. However, these problems get complicated quickly when with simple data structures, and it will be easier to prove that your algorithm is working correctly if you follow them.
Your Q
and Q_buf
are shared variables. Due to Rule 1, I would prefer to have them as local variables declared in the function that uses them (consume()
and produce()
, respectively). There will be 1 shared buffer that will be protected by a mutex. The producer will add to its local buffer. When that buffer is full, it acquires the mutex and pushes the local buffer to the shared buffer. It then waits for the consumer to accept this buffer before producing more data.
The consumer waits for this shared buffer to "arrive", then it acquires the mutex and replaces its empty local buffer with the shared buffer. Then it signals to the producer that the buffer has been accepted so it knows to start producing again.
Semantically, I don't see a reason to use swap
over move
, since in every case one of the containers is empty anyway. Maybe you want to use swap
because you know something about the underlying memory. You can use whichever you want and it will be fast and work the same (at least algorithmically).
This problem can be done with 1 condition variable, but it may be a little easier to think about if you use 2.
Here's what I came up with. Tested on Visual Studio 2017 (15.6.7) and GCC 5.4.0. I don't need to be credited or anything (it's such a simple piece), but legally I have to say that I offer no warranties whatsoever.
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <chrono>
std::vector<int> g_deliveryBuffer;
bool g_quit = false;
std::mutex g_mutex; // protects g_deliveryBuffer and g_quit
std::condition_variable g_producerDeliver;
std::condition_variable g_consumerAccepted;
// consumer
void consume()
{
// local buffer
std::vector<int> consumerBuffer;
while (true)
{
if (consumerBuffer.empty())
{
std::unique_lock<std::mutex> lock(g_mutex);
while (g_deliveryBuffer.empty() && !g_quit) // if we beat the producer, wait for them to push to the deliverybuffer
g_producerDeliver.wait(lock);
if (g_quit)
break;
consumerBuffer = std::move(g_deliveryBuffer); // get the buffer
}
g_consumerAccepted.notify_one(); // notify the producer that the buffer has been accepted
// for-loop to process the elems in Q
// ...
consumerBuffer.clear();
// ...
}
}
// producer
void produce()
{
std::vector<int> producerBuffer;
while (true)
{
// for-loop to fill up Q_buf
// ...
producerBuffer = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
// ...
// once Q_buf is fully filled, wait until consumer asks to give it a full Q
{ // scope is for lock
std::unique_lock<std::mutex> lock(g_mutex);
g_deliveryBuffer = std::move(producerBuffer); // ok to push to deliverybuffer. it is guaranteed to be empty
g_producerDeliver.notify_one();
while (!g_deliveryBuffer.empty() && !g_quit)
g_consumerAccepted.wait(lock); // wait for consumer to signal for more data
if (g_quit)
break;
// We will never reach this point if the buffer is not empty.
}
}
}
int main()
{
// spawn threads
std::thread consumerThread(consume);
std::thread producerThread(produce);
// for for 5 seconds
std::this_thread::sleep_for(std::chrono::seconds(5));
// signal that it's time to quit
{
std::lock_guard<std::mutex> lock(g_mutex);
g_quit = true;
}
// one of the threads may be sleeping
g_consumerAccepted.notify_one();
g_producerDeliver.notify_one();
consumerThread.join();
producerThread.join();
return 0;
}
unique_lock<>
are not calls. They instantiate objects which wrap a mutex. These objects will ensure that an appropriate unlock is called when they go out of scope. See en.cppreference.com/w/cpp/thread/unique_lock and en.wikipedia.org/wiki/Resource_acquisition_is_initialization – Kimbro