I am trying to understand the basic multithreading mechanisms in the new C++ 11 standard. The most basic example I can think of is the following:
- A producer and a consumer are implemented in separate threads
- The producer places a certain amount of items inside a queue
- The consumer takes items from the queue if there are any present
This example is also used in many school books about multithreading and everything about the communication process works fine. However, I have a problem when it comes to stopping the consumer thread.
I want the consumer to run until it gets an explicit stop signal (in most cases this means that I wait for the producer to finish so I can stop the consumer before the program is ended). Unfortunately C++ 11 threads lack an interrupt mechanism (which I know from multithreading in Java for example). Thus, I have to use flags like isRunning
to signal that I want a thread to stop.
The main problem now is: After I have stopped the producer thread, the queue is empty and the consumer is waiting on a condition_variable
to get a signal when the queue is filled again. So I need to wake the thread up by calling notify_all()
on the variable before exiting.
I have found a working solution, but it seems somehow messy. The example code is listed below (I am sorry but somehow I couldn't reduce the code size any furhter for a "minimal" minimal example):
The Queue class:
class Queue{
public:
Queue() : m_isProgramStopped{ false } { }
void push(int i){
std::unique_lock<std::mutex> lock(m_mtx);
m_q.push(i);
m_cond.notify_one();
}
int pop(){
std::unique_lock<std::mutex> lock(m_mtx);
m_cond.wait(lock, [&](){ return !m_q.empty() || m_isProgramStopped; });
if (m_isProgramStopped){
throw std::exception("Program stopped!");
}
int x = m_q.front();
m_q.pop();
std::cout << "Thread " << std::this_thread::get_id() << " popped " << x << "." << std::endl;
return x;
}
void stop(){
m_isProgramStopped = true;
m_cond.notify_all();
}
private:
std::queue<int> m_q;
std::mutex m_mtx;
std::condition_variable m_cond;
bool m_isProgramStopped;
};
The Producer:
class Producer{
public:
Producer(Queue & q) : m_q{ q }, m_counter{ 1 } { }
void produce(){
for (int i = 0; i < 5; i++){
m_q.push(m_counter++);
std::this_thread::sleep_for(std::chrono::milliseconds{ 500 });
}
}
void execute(){
m_t = std::thread(&Producer::produce, this);
}
void join(){
m_t.join();
}
private:
Queue & m_q;
std::thread m_t;
unsigned int m_counter;
};
The Consumer:
class Consumer{
public:
Consumer(Queue & q) : m_q{ q }, m_takeCounter{ 0 }, m_isRunning{ true }
{ }
~Consumer(){
std::cout << "KILL CONSUMER! - TOOK: " << m_takeCounter << "." << std::endl;
}
void consume(){
while (m_isRunning){
try{
m_q.pop();
m_takeCounter++;
}
catch (std::exception e){
std::cout << "Program was stopped while waiting." << std::endl;
}
}
}
void execute(){
m_t = std::thread(&Consumer::consume, this);
}
void join(){
m_t.join();
}
void stop(){
m_isRunning = false;
}
private:
Queue & m_q;
std::thread m_t;
unsigned int m_takeCounter;
bool m_isRunning;
};
And finally the main()
:
int main(void){
Queue q;
Consumer cons{ q };
Producer prod{ q };
cons.execute();
prod.execute();
prod.join();
cons.stop();
q.stop();
cons.join();
std::cout << "END" << std::endl;
return EXIT_SUCCESS;
}
Is this the right way to end a thread that is waiting an a condition variable or are there better methods? Currently, the queue needs to know if the program has stopped (which in my opinion destroys the loose coupling of the components) and I need to call stop()
on the queue explicitly which doesn't seem right.
Additionaly, the condition variable which should just be used as a singal if the queue is empty now stands for another condition - if the program has ended. If I am not mistaken, every time a thread waits on a condition variable for some event to happen, it would also have to check if the thread has to be stopped before continuing its execution (which also seems wrong).
Do I have these problems because my entire design is faulty or am I missing some mechanisms that can be used to exit threads in a clean way?
stop()
from the destructor of your queue. See a similar solution https://mcmap.net/q/130869/-what-is-the-proper-way-of-doing-event-handling-in-c – HandgunQueue::m_isProgramStopped
andConsumer::m_isRunning
should beatomic<bool>
oratomic_flag
becausemain()
writes both of them while not protected by a mutex lock. – MonolithicQueue::m_isProgramStopped
atomic will make it data-race free, but still allows the race between thenotify
instop
andwait
inpop
. It's possible for a thread instop
tonotify
after a thread inpop
has checkedm_isProgramStopped
but before it has slept on the condition variable, resulting in the notification being lost and a main thread that waits forever for a consumer that will never exit. The better solution is to guard all accesses tom_isProgramStopped
withm_mtx
. – Imaldam_mtx
. – Lukeyexecute
andstop/join
multiple times, put the body of those functions in constructors and destructors respectively. As it is, it's a bit C-ish. – Lukey