thread sync using mutex and condition variable
Asked Answered
K

5

7

I'm trying to implement an multi-thread job, a producer and a consumer, and basically what I want to do is, when consumer finishes the data, it notifies the producer so that producer provides new data.

The tricky part is, in my current impl, producer and consumer both notifies each other and waits for each other, I don't know how to implement this part correctly.

For example, see the code below,

mutex m;
condition_variable cv;

vector<int> Q;  // this is the queue the consumer will consume
vector<int> Q_buf;  // this is a buffer Q into which producer will fill new data directly

// consumer
void consume() {
  while (1) {
    if (Q.size() == 0) {  // when consumer finishes data
      unique_lock<mutex> lk(m);
      // how to notify producer to fill up the Q?
      ...
      cv.wait(lk);
    }

    // for-loop to process the elems in Q
    ...
  }
}

// producer
void produce() {
  while (1) {
    // for-loop to fill up Q_buf
    ...

    // once Q_buf is fully filled, wait until consumer asks to give it a full Q
    unique_lock<mutex> lk(m);
    cv.wait(lk);
    Q.swap(Q_buf);  // replace the empty Q with the full Q_buf
    cv.notify_one();
  }
}

I'm not sure this the above code using mutex and condition_variable is the right way to implement my idea, please give me some advice!

Kasey answered 7/5, 2018 at 14:17 Comment(2)
The structure looks incomplete to me, you are missing unlock calls in both. Also, hopefully you are removing elements from the Q in the consumer - otherwise the size will never be 0. In general though, it is better to use predicate in the wait.Wilda
@VamsidharReddyGaddam the lines with unique_lock<> are not calls. They instantiate objects which wrap a mutex. These objects will ensure that an appropriate unlock is called when they go out of scope. See en.cppreference.com/w/cpp/thread/unique_lock and en.wikipedia.org/wiki/Resource_acquisition_is_initializationKimbro
C
9

The code incorrectly assumes that vector<int>::size() and vector<int>::swap() are atomic. They are not.

Also, spurious wakeups must be handled by a while loop (or another cv::wait overload).

Fixes:

mutex m;
condition_variable cv;
vector<int> Q;

// consumer
void consume() {
    while(1) {
        // Get the new elements.
        vector<int> new_elements;
        {
            unique_lock<mutex> lk(m);
            while(Q.empty())
                cv.wait(lk);
            new_elements.swap(Q);
        }
        // for-loop to process the elems in new_elements
    }
}

// producer
void produce() {
    while(1) {
        vector<int> new_elements;
        // for-loop to fill up new_elements

        // publish new_elements
        {
            unique_lock<mutex> lk(m);
            Q.insert(Q.end(), new_elements.begin(), new_elements.end());
            cv.notify_one();
        }
    }
}
Catholicity answered 7/5, 2018 at 15:11 Comment(0)
R
4

Maybe that is close to what you want to achive. I used 2 conditional variables to notify producers and consumers between each other and introduced variable denoting which turn is now:

#include <ctime>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

template<typename T>
class ReaderWriter {
    private:
        std::vector<std::thread> readers;
        std::vector<std::thread> writers;
        std::condition_variable readerCv, writerCv;
        std::queue<T> data;
        std::mutex readerMutex, writerMutex;
        size_t noReaders, noWriters;
        enum class Turn { WRITER_TURN, READER_TURN };
        Turn turn;
        void reader() {
            while (1) {
                {
                    std::unique_lock<std::mutex> lk(readerMutex);    
                    while (turn != Turn::READER_TURN) {
                        readerCv.wait(lk);
                    }
                    std::cout << "Thread : " << std::this_thread::get_id() << " consumed " << data.front() << std::endl;
                    data.pop();
                    if (data.empty()) {
                        turn = Turn::WRITER_TURN;
                        writerCv.notify_one();
                    }
                }
            }
        }

        void writer() {
            while (1) {
                {
                    std::unique_lock<std::mutex> lk(writerMutex);
                    while (turn != Turn::WRITER_TURN) {
                        writerCv.wait(lk);
                    }
                    srand(time(NULL));
                    int random_number = std::rand();
                    data.push(random_number);
                    std::cout << "Thread : " << std::this_thread::get_id() << " produced " << random_number << std::endl;
                    turn = Turn::READER_TURN;
                }
                readerCv.notify_one();
            }
        }

    public:
        ReaderWriter(size_t noReadersArg, size_t noWritersArg) : noReaders(noReadersArg), noWriters(noWritersArg), turn(ReaderWriter::Turn::WRITER_TURN) {
        }

        void run() {
            int noReadersArg = noReaders, noWritersArg = noWriters;
            while (noReadersArg--) {
                readers.emplace_back(&ReaderWriter::reader, this);
            }

            while (noWritersArg--) {
                writers.emplace_back(&ReaderWriter::writer, this);
            }
        }

        ~ReaderWriter() {
            for (auto& r : readers) {
                r.join();
            }
            for (auto& w : writers) {
                w.join();
            }
        }
};

int main() {
    ReaderWriter<int> rw(5, 5);
    rw.run();
}
Ran answered 17/5, 2018 at 2:41 Comment(0)
O
3

Here's a code snippet. Since the worker treads are already synchronized, requirement of two buffers is ruled out. So a simple queue is used to simulate the scenario:

#include "conio.h"
#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
#include <atomic>
#include <condition_variable>

using namespace std;

enum state_t{ READ = 0, WRITE = 1 };

mutex mu;
condition_variable cv;
atomic<bool> running;
queue<int> buffer;
atomic<state_t> state;

void generate_test_data()
{
    const int times = 5;
    static int data = 0;

    for (int i = 0; i < times; i++) {
        data = (data++) % 100;
        buffer.push(data);
    }
}

void ProducerThread() {

    while (running) {
        unique_lock<mutex> lock(mu);
        cv.wait(lock, []() { return !running || state == WRITE; });
        if (!running) return;
        generate_test_data(); //producing here
        lock.unlock();

        //notify consumer to start consuming
        state = READ;
        cv.notify_one();
    }
}

void ConsumerThread() {

    while (running) {

        unique_lock<mutex> lock(mu);
        cv.wait(lock, []() { return !running || state == READ; });
        if (!running) return;
        while (!buffer.empty()) {
            auto data = buffer.front();  //consuming here
            buffer.pop();                  
            cout << data << " \n";
        }

        //notify producer to start producing
        if (buffer.empty()) {
            state = WRITE;
            cv.notify_one();
        }
    }
}

int main(){
    running = true;
    thread producer = thread([]() { ProducerThread(); });
    thread consumer = thread([]() { ConsumerThread(); });

    //simulating gui thread
    while (!getch()){
    }

    running = false;
    producer.join();
    consumer.join();
}
Observatory answered 13/5, 2018 at 3:41 Comment(1)
There are race conditions because state is modified and condition is signalled without mutex locked. After ConsumerThread did state == READ the ProducerThread can do state = READ; cv.notify_one(); before ConsumerThread blocks to wait on the condition variable leading to missed notification. This is a very common mistake.Catholicity
M
3

Not a complete answer, though I think two condition variables could be helpful, one named buffer_empty that the producer thread will wait on, and another named buffer_filled that the consumer thread will wait on. Number of mutexes, how to loop, and so on I cannot comment on, since I'm not sure about the details myself.

Myasthenia answered 15/5, 2018 at 15:37 Comment(0)
D
2
  1. Accesses to shared variables should only be done while holding the mutex that protects it
  2. condition_variable::wait should check a condition.
    1. The condition should be a shared variable protected by the mutex that you pass to condition_variable::wait.
    2. The way to check the condition is to wrap the call to wait in a while loop or use the 2-argument overload of wait (which is equivalent to the while-loop version)

Note: These rules aren't strictly necessary if you truly understand what the hardware is doing. However, these problems get complicated quickly when with simple data structures, and it will be easier to prove that your algorithm is working correctly if you follow them.

Your Q and Q_buf are shared variables. Due to Rule 1, I would prefer to have them as local variables declared in the function that uses them (consume() and produce(), respectively). There will be 1 shared buffer that will be protected by a mutex. The producer will add to its local buffer. When that buffer is full, it acquires the mutex and pushes the local buffer to the shared buffer. It then waits for the consumer to accept this buffer before producing more data.

The consumer waits for this shared buffer to "arrive", then it acquires the mutex and replaces its empty local buffer with the shared buffer. Then it signals to the producer that the buffer has been accepted so it knows to start producing again.

Semantically, I don't see a reason to use swap over move, since in every case one of the containers is empty anyway. Maybe you want to use swap because you know something about the underlying memory. You can use whichever you want and it will be fast and work the same (at least algorithmically).

This problem can be done with 1 condition variable, but it may be a little easier to think about if you use 2.

Here's what I came up with. Tested on Visual Studio 2017 (15.6.7) and GCC 5.4.0. I don't need to be credited or anything (it's such a simple piece), but legally I have to say that I offer no warranties whatsoever.

#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <chrono>


std::vector<int> g_deliveryBuffer;
bool g_quit = false;
std::mutex g_mutex;  // protects g_deliveryBuffer and g_quit
std::condition_variable g_producerDeliver;
std::condition_variable g_consumerAccepted;


// consumer
void consume() 
{
    // local buffer
    std::vector<int> consumerBuffer;

    while (true)
    {
        if (consumerBuffer.empty())
        {  
            std::unique_lock<std::mutex> lock(g_mutex);
            while (g_deliveryBuffer.empty() && !g_quit)  // if we beat the producer, wait for them to push to the deliverybuffer
                g_producerDeliver.wait(lock);
            if (g_quit)
                break;
            consumerBuffer = std::move(g_deliveryBuffer);  // get the buffer
        }
        g_consumerAccepted.notify_one();  // notify the producer that the buffer has been accepted

        // for-loop to process the elems in Q
        // ...
        consumerBuffer.clear();
        // ...
    }
}


// producer
void produce() 
{
    std::vector<int> producerBuffer;
    while (true) 
    {
        // for-loop to fill up Q_buf
        // ...
        producerBuffer = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        // ...

        // once Q_buf is fully filled, wait until consumer asks to give it a full Q
        {   // scope is for lock
            std::unique_lock<std::mutex> lock(g_mutex);
            g_deliveryBuffer = std::move(producerBuffer);  // ok to push to deliverybuffer. it is guaranteed to be empty
            g_producerDeliver.notify_one();
            while (!g_deliveryBuffer.empty() && !g_quit)
                g_consumerAccepted.wait(lock);  // wait for consumer to signal for more data
            if (g_quit)
                break;
            // We will never reach this point if the buffer is not empty.
        }
    }
}



int main()
{
    // spawn threads
    std::thread consumerThread(consume);
    std::thread producerThread(produce);

    // for for 5 seconds
    std::this_thread::sleep_for(std::chrono::seconds(5));

    // signal that it's time to quit
    {
        std::lock_guard<std::mutex> lock(g_mutex);
        g_quit = true;
    }
    // one of the threads may be sleeping
    g_consumerAccepted.notify_one();
    g_producerDeliver.notify_one();

    consumerThread.join();
    producerThread.join();

    return 0;
}
Dacoity answered 19/5, 2018 at 18:45 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.