Stopping C++ 11 std::threads waiting on a std::condition_variable
Asked Answered
J

2

36

I am trying to understand the basic multithreading mechanisms in the new C++ 11 standard. The most basic example I can think of is the following:

  • A producer and a consumer are implemented in separate threads
  • The producer places a certain amount of items inside a queue
  • The consumer takes items from the queue if there are any present

This example is also used in many school books about multithreading and everything about the communication process works fine. However, I have a problem when it comes to stopping the consumer thread.

I want the consumer to run until it gets an explicit stop signal (in most cases this means that I wait for the producer to finish so I can stop the consumer before the program is ended). Unfortunately C++ 11 threads lack an interrupt mechanism (which I know from multithreading in Java for example). Thus, I have to use flags like isRunning to signal that I want a thread to stop.

The main problem now is: After I have stopped the producer thread, the queue is empty and the consumer is waiting on a condition_variable to get a signal when the queue is filled again. So I need to wake the thread up by calling notify_all() on the variable before exiting.

I have found a working solution, but it seems somehow messy. The example code is listed below (I am sorry but somehow I couldn't reduce the code size any furhter for a "minimal" minimal example):

The Queue class:

class Queue{
public:
    Queue() : m_isProgramStopped{ false } { }

    void push(int i){
        std::unique_lock<std::mutex> lock(m_mtx);
        m_q.push(i);
        m_cond.notify_one();
    }

    int pop(){
        std::unique_lock<std::mutex> lock(m_mtx);
        m_cond.wait(lock, [&](){ return !m_q.empty() || m_isProgramStopped; });

        if (m_isProgramStopped){
            throw std::exception("Program stopped!");
        }

        int x = m_q.front();
        m_q.pop();

        std::cout << "Thread " << std::this_thread::get_id() << " popped " << x << "." << std::endl;
        return x;
    }

    void stop(){
        m_isProgramStopped = true;
        m_cond.notify_all();
    }

private:
    std::queue<int> m_q;
    std::mutex m_mtx;
    std::condition_variable m_cond;
    bool m_isProgramStopped;
};

The Producer:

class Producer{
public:
    Producer(Queue & q) : m_q{ q }, m_counter{ 1 } { }

    void produce(){
        for (int i = 0; i < 5; i++){
            m_q.push(m_counter++);
            std::this_thread::sleep_for(std::chrono::milliseconds{ 500 });
        }
    }

    void execute(){
        m_t = std::thread(&Producer::produce, this);
    }

    void join(){
        m_t.join();
    }

private:
    Queue & m_q;
    std::thread m_t;

    unsigned int m_counter;
};

The Consumer:

class Consumer{
public:
    Consumer(Queue & q) : m_q{ q }, m_takeCounter{ 0 }, m_isRunning{ true }
    { }

    ~Consumer(){
        std::cout << "KILL CONSUMER! - TOOK: " << m_takeCounter << "." << std::endl;
    }

    void consume(){
        while (m_isRunning){
            try{
                m_q.pop();
                m_takeCounter++;
            }
            catch (std::exception e){
                std::cout << "Program was stopped while waiting." << std::endl;
            }
        }
    }

    void execute(){
        m_t = std::thread(&Consumer::consume, this);
    }

    void join(){
        m_t.join();
    }

    void stop(){
        m_isRunning = false;
    }

private:
    Queue & m_q;
    std::thread m_t;

    unsigned int m_takeCounter;
    bool m_isRunning;
};

And finally the main():

int main(void){
    Queue q;

    Consumer cons{ q };
    Producer prod{ q };

    cons.execute();
    prod.execute();

    prod.join();

    cons.stop();
    q.stop();

    cons.join();

    std::cout << "END" << std::endl;

    return EXIT_SUCCESS;
}

Is this the right way to end a thread that is waiting an a condition variable or are there better methods? Currently, the queue needs to know if the program has stopped (which in my opinion destroys the loose coupling of the components) and I need to call stop() on the queue explicitly which doesn't seem right.

Additionaly, the condition variable which should just be used as a singal if the queue is empty now stands for another condition - if the program has ended. If I am not mistaken, every time a thread waits on a condition variable for some event to happen, it would also have to check if the thread has to be stopped before continuing its execution (which also seems wrong).

Do I have these problems because my entire design is faulty or am I missing some mechanisms that can be used to exit threads in a clean way?

Joyner answered 13/2, 2014 at 14:33 Comment(9)
That is pretty much what we do in our code. Set a "stopping" variable and notify the condition variable, and have it test that flag as the first thing it does. There doesn't seem to be an easy, more elegant general solution that we have found.Nonobedience
You can call stop() from the destructor of your queue. See a similar solution https://mcmap.net/q/130869/-what-is-the-proper-way-of-doing-event-handling-in-cHandgun
If you just want to implement a "Producer/Consumer" pattern, then there are other approaches which solve this problem without the need to find a solution for the nasty problem of "invalidating" or "canceling" a synchronization primitive on order to resume a thread.Amphibious
You could also use a Sentinel (c2.com/cgi/wiki?SentinelPattern)Sorites
Both Queue::m_isProgramStopped and Consumer::m_isRunning should be atomic<bool> or atomic_flag because main() writes both of them while not protected by a mutex lock.Monolithic
@Monolithic I think the condition variable notify and wait provide a memory barrier making this unnecessary. Possibly. Don't quote me on that though...Nonobedience
@Monolithic Making Queue::m_isProgramStopped atomic will make it data-race free, but still allows the race between the notify in stop and wait in pop. It's possible for a thread in stop to notify after a thread in pop has checked m_isProgramStopped but before it has slept on the condition variable, resulting in the notification being lost and a main thread that waits forever for a consumer that will never exit. The better solution is to guard all accesses to m_isProgramStopped with m_mtx.Imalda
What Casey said. Notifying conditions should be locked by the same mutex on which it waits somewhere else in the code. I've made the same mistake before. Essentially, the first line of Queue::stop should be lock m_mtx.Lukey
On style: if you don't expect to call execute and stop/join multiple times, put the body of those functions in constructors and destructors respectively. As it is, it's a bit C-ish.Lukey
C
8

No, there's nothing wrong with your design, and it's the normal approach taken for this sort of problem.

It's perfectly valid for you to have multiple conditions (eg anything on queue or program stopping) attached to a condition variable. The key thing is that the bits in the condition are checked for when the wait returns.

Instead of having a flag in Queue to indicate that the program is stopping you should think of the flag as "can I accept". This is a better overall paradigm and works better in a multi-threaded environment.

Also, instead of having pop throw an exception if someone calls it and stop has been called you could replace the method with bool try_pop(int &value) which will return true if a value was returned, otherwise false. This way the caller can check on failure to see if the queue has been stopped (add a bool is_stopped() const method). Although exception handling works here it's a bit heavy handed and isn't really an exceptional case in a multi-threaded program.

Contemn answered 13/2, 2014 at 14:58 Comment(2)
But if the task run by the thread takes a very long time to process, then you need to wait a while before the stop() call takes effect. So in that regard, is that the best possible design?Eveevection
Could you elaborate on what you mean by 'Instead of having a flag in Queue to indicate that the program is stopping you should think of the flag as "can I accept"'?Printery
D
3

wait can be called with a timeout. Control is returned to the thread and stop could be checked. Depending on that value it can wait on more items to be consumed or finish execution. A good introduction to multithreading with c++ is C++11 Concurrency .

Dwaindwaine answered 13/2, 2014 at 15:51 Comment(2)
Just to make it clear, it is not an overload of std::condition_variable::wait but a different method std::condition_variable::wait_forChilcote
@Chilcote It's a solution, but seems not so good.Aurar

© 2022 - 2024 — McMap. All rights reserved.