I'm trying to implement a class that uses two threads: one for the producer and one for the consumer. The current implementation does not use locks:
#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>
using Queue =
boost::lockfree::spsc_queue<
int,
boost::lockfree::capacity<1024>>;
class Worker
{
public:
Worker() : working_(false), done_(false) {}
~Worker() {
done_ = true; // exit even if the work has not been completed
worker_.join();
}
void enqueue(int value) {
queue_.push(value);
if (!working_) {
working_ = true;
worker_ = std::thread([this]{ work(); });
}
}
void work() {
int value;
while (!done_ && queue_.pop(value)) {
std::cout << value << std::endl;
}
working_ = false;
}
private:
std::atomic<bool> working_;
std::atomic<bool> done_;
Queue queue_;
std::thread worker_;
};
The application needs to enqueue work items for a certain amount of time and then sleep waiting for an event. This is a minimal main that simulates the behavior:
int main()
{
Worker w;
for (int i = 0; i < 1000; ++i)
w.enqueue(i);
std::this_thread::sleep_for(std::chrono::seconds(1));
for (int i = 0; i < 1000; ++i)
w.enqueue(i);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
I'm pretty sure that my implementation is bugged: what if the worker thread completes and before executing working_ = false
, another enqueue
comes? Is it possible to make my code thread safe without using locks?
The solution requires:
- a fast enqueue
- the destructor has to quit even if the queue is not empty
- no busy wait, because there are long period of time in which the worker thread is idle
- no locks if possible
Edit
I did another implementation of the Worker
class, based on your suggestions. Here is my second attempt:
class Worker
{
public:
Worker()
: working_(ATOMIC_FLAG_INIT), done_(false) { }
~Worker() {
// exit even if the work has not been completed
done_ = true;
if (worker_.joinable())
worker_.join();
}
bool enqueue(int value) {
bool enqueued = queue_.push(value);
if (!working_.test_and_set()) {
if (worker_.joinable())
worker_.join();
worker_ = std::thread([this]{ work(); });
}
return enqueued;
}
void work() {
int value;
while (!done_ && queue_.pop(value)) {
std::cout << value << std::endl;
}
working_.clear();
while (!done_ && queue_.pop(value)) {
std::cout << value << std::endl;
}
}
private:
std::atomic_flag working_;
std::atomic<bool> done_;
Queue queue_;
std::thread worker_;
};
I introduced the worker_.join()
inside the enqueue
method. This can impact the performances, but in very rare cases (when the queue gets empty and before the thread exits, another enqueue
comes). The working_
variable is now an atomic_flag
that is set in enqueue
and cleared in work
. The Additional while
after working_.clear()
is needed because if another value is pushed, before the clear
, but after the while
, the value is not processed.
Is this implementation correct?
I did some tests and the implementation seems to work.
OT: Is it better to put this as an edit, or an answer?
enqueue
should usestd::atomic_compare_exchange_weak
orstd::atomic_compare_exchange_strong
– Subcutaneousenqueue
is called by a critically performance code, so I'm trying to optimize that part. Yes, thread suspension is a possible problem. – Flauntenqueue
to be able to work and advance (without much cost) even if there is a priority inversion and/or the worker thread has been suspended and/or the worker thread is just never scheduled? How critical performance -- per pixel per frame operation on a 4k x 2k buffer performance critical, where if the image isn't rendered at 60 Hz someone dies? Or, "I don't want to make the user annoyed when they click a UI element"? Probably somewhere in between, but where? – Renfrew