C++11 lockfree single producer single consumer: how to avoid busy wait
Asked Answered
F

4

9

I'm trying to implement a class that uses two threads: one for the producer and one for the consumer. The current implementation does not use locks:

#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>

using Queue =
        boost::lockfree::spsc_queue<
            int,
            boost::lockfree::capacity<1024>>;

class Worker
{
public:
    Worker() : working_(false), done_(false) {}
    ~Worker() {
        done_ = true;    // exit even if the work has not been completed
        worker_.join();
    }

    void enqueue(int value) {
        queue_.push(value);
        if (!working_) {
            working_ = true;
            worker_ = std::thread([this]{ work(); });
        }
    }

    void work() {
        int value;
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
        working_ = false;
    }

private:
    std::atomic<bool> working_;
    std::atomic<bool> done_;
    Queue queue_;
    std::thread worker_;
};

The application needs to enqueue work items for a certain amount of time and then sleep waiting for an event. This is a minimal main that simulates the behavior:

int main()
{
    Worker w;
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

I'm pretty sure that my implementation is bugged: what if the worker thread completes and before executing working_ = false, another enqueue comes? Is it possible to make my code thread safe without using locks?

The solution requires:

  • a fast enqueue
  • the destructor has to quit even if the queue is not empty
  • no busy wait, because there are long period of time in which the worker thread is idle
  • no locks if possible

Edit

I did another implementation of the Worker class, based on your suggestions. Here is my second attempt:

class Worker
{
public:
    Worker()
        : working_(ATOMIC_FLAG_INIT), done_(false) { } 

    ~Worker() {
        // exit even if the work has not been completed
        done_ = true;
        if (worker_.joinable())
            worker_.join();
    }

    bool enqueue(int value) {
        bool enqueued = queue_.push(value);
        if (!working_.test_and_set()) {
            if (worker_.joinable())
                worker_.join();
            worker_ = std::thread([this]{ work(); });
        }
        return enqueued;
    }

    void work() {
        int value;
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
        working_.clear();
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
    }

private:
    std::atomic_flag working_;
    std::atomic<bool> done_;
    Queue queue_;
    std::thread worker_;
};

I introduced the worker_.join() inside the enqueue method. This can impact the performances, but in very rare cases (when the queue gets empty and before the thread exits, another enqueue comes). The working_ variable is now an atomic_flag that is set in enqueue and cleared in work. The Additional while after working_.clear() is needed because if another value is pushed, before the clear, but after the while, the value is not processed.

Is this implementation correct?

I did some tests and the implementation seems to work.

OT: Is it better to put this as an edit, or an answer?

Flaunt answered 9/6, 2014 at 12:6 Comment(8)
I think that enqueue should use std::atomic_compare_exchange_weak or std::atomic_compare_exchange_strongSubcutaneous
Why are you against locks? Are you expecting your threads to be suspended? Or are you of the impression that locks reduce performance?Renfrew
@Yakk I am not against lock per se. The enqueue is called by a critically performance code, so I'm trying to optimize that part. Yes, thread suspension is a possible problem.Flaunt
@michele.bertasi So you want enqueue to be able to work and advance (without much cost) even if there is a priority inversion and/or the worker thread has been suspended and/or the worker thread is just never scheduled? How critical performance -- per pixel per frame operation on a 4k x 2k buffer performance critical, where if the image isn't rendered at 60 Hz someone dies? Or, "I don't want to make the user annoyed when they click a UI element"? Probably somewhere in between, but where?Renfrew
@Yakk Somewhere in the middle; actually a per-frame operation on a real time streaming video.Flaunt
I think you're better off not letting the thread die in-between; have it wait on a semaphore instead. You'd have to reset semaphore in the destructor, though.Zwolle
@Zwolle Maybe you are right. I will give it a try.Flaunt
Actually, it's occurred to me that what you really need is simply a blocking queue (that's fast and lock-free under high contention, and blocks the thread harmlessly otherwise), with a slight twist to allow the thread to exit while it's waiting. Of course, I don't know of any good implementations of one in C++, but the typical way is to build one on top of a normal thread-safe queue using semaphores.Zwolle
F
0

This is my solution of the question. I don't like very much answering myself, but I think showing actual code may help others.

#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>
// I used this semaphore class: https://gist.github.com/yohhoy/2156481
#include "binsem.hpp"

using Queue =
    boost::lockfree::spsc_queue<
        int,
        boost::lockfree::capacity<1024>>;

class Worker
{
public:
    // the worker thread starts in the constructor
    Worker()
        : working_(ATOMIC_FLAG_INIT), done_(false), semaphore_(0)
        , worker_([this]{ work(); })
    { } 

    ~Worker() {
        // exit even if the work has not been completed
        done_ = true;
        semaphore_.signal();
        worker_.join();
    }

    bool enqueue(int value) {
        bool enqueued = queue_.push(value);
        if (!working_.test_and_set())
            // signal to the worker thread to wake up
            semaphore_.signal();
        return enqueued;
    }

    void work() {
        int value;
        // the worker thread continue to live
        while (!done_) {
            // wait the start signal, sleeping
            semaphore_.wait();
            while (!done_ && queue_.pop(value)) {
                // perform actual work
                std::cout << value << std::endl;
            }
            working_.clear();
            while (!done_ && queue_.pop(value)) {
                // perform actual work
                std::cout << value << std::endl;
            }
        }
    }

private:
    std::atomic_flag working_;
    std::atomic<bool> done_;
    binsem semaphore_;
    Queue queue_;
    std::thread worker_;
};

I tried the suggestion of @Cameron, to not shutdown the thread and adding a semaphore. This actually is used only in the first enqueue and in the last work. This is not lock-free, but only in these two cases.

I did some performance comparison, between my previous version (see my edited question), and this one. There are no significant differences, when there are not many start and stop. However, the enqueue is 10 times faster when it have to signal the worker thread, instead of starting a new thread. This is a rare case, so it is not very important, but anyway it is an improvement.

This implementation satisfies:

  • lock-free in the common case (when enqueue and work are busy);
  • no busy wait in case for long time there are not enqueue
  • the destructor exits as soon as possible
  • correctness?? :)
Flaunt answered 10/6, 2014 at 17:44 Comment(3)
Seems a little better to me, but it's definitely not correct: working_ could be true during enqueue (so the semaphore doesn't get set), but the worker thread could already be out of the first inner while loop (and just hasn't cleared working_ yet). Then it will wait on the semaphore forever (until the next enqueue). I suggest you use the semaphore exclusively -- although having just poked around, I don't see any nice ones that spin for a few cycles (lock-free) before blocking on an OS primitive...Zwolle
@Zwolle If the worker thread has not cleared working_, but another enqueue comes, it is not a problem, because after the clear, there is another loop. It ensures that those items are processed. In the meantime, if another enqueue comes, then the working_ flag is now 0, so the semaphore gets the post.Flaunt
Hmm, I missed that. Interesting. Why not just get rid of working_ altogether then? (Ah, I think I know why: the semaphore is heavyweight -- I really don't see a reason why the semaphore can't just be an atomic integer and a simple spin-wait that falls back to a heavier OS primitive only if necessary; I might write such an implementation soon, it seems that a lot of code could benefit from a lightweight semaphore).Zwolle
F
2

what if the worker thread completes and before executing working_ = false, another enqueue comes?

Then the value will be pushed to the queue but will not be processed until another value is enqueued after the flag is set. You (or your users) may decide whether that is acceptable. This can be avoided using locks, but they're against your requirements.

The code may fail if the running thread is about to finish and sets working_ = false; but hasn't stopped running before next value is enqueued. In that case your code will call operator= on the running thread which results in a call to std::terminate according to the linked documentation.

Adding worker_.join() before assigning the worker to a new thread should prevent that.

Another problem is that queue_.push may fail if the queue is full because it has a fixed size. Currently you just ignore the case and the value will not be added to the full queue. If you wait for queue to have space, you don't get fast enqueue (in the edge case). You could take the bool returned by push (which tells if it was successful) and return it from enqueue. That way the caller may decide whether it wants to wait or discard the value.

Or use non-fixed size queue. Boost has this to say about that choice:

Can be used to completely disable dynamic memory allocations during push in order to ensure lockfree behavior. If the data structure is configured as fixed-sized, the internal nodes are stored inside an array and they are addressed by array indexing. This limits the possible size of the queue to the number of elements that can be addressed by the index type (usually 2**16-2), but on platforms that lack double-width compare-and-exchange instructions, this is the best way to achieve lock-freedom.

Frigidaire answered 9/6, 2014 at 12:49 Comment(5)
I don't start a thread at every enqueue, but only at the first one. The thread continue to work until the queue is empty, then stops (this is because work ops are largely slower than enqueues; maybe it is not clear from the code I've posted). The problem with busy waiting is that in my specific case I have large periods of time in which the worker thread is not working. My choice was taken for this specific reason.Flaunt
Ah, yes, I read your code wrong, the lower half of my answer is pretty much incorrect. I've now removed it.Frigidaire
I've edited my question for clarity on the requirementsFlaunt
I've fixed my answer which was still partly wrong because of my wrong interpretation of your code. I also have doubts now that atomic comparison doesn't help in this case after all.Frigidaire
You are right, the enqueue can fail, but in my case the queue size can be bounded. However it is certain better to return a boolean in case it fails, at least to provide an assertion.Flaunt
R
1

Your worker thread needs more than 2 states.

  • Not running
  • Doing tasks
  • Idle shutdown
  • Shutdown

If you force shut down, it skips idle shutdown. If you run out of tasks, it transitions to idle shutdown. In idle shutdown, it empties the task queue, then goes into shutting down.

Shutdown is set, then you walk off the end of your worker task.

The producer first puts things on the queue. Then it checks the worker state. If Shutdown or Idle shutdown, first join it (and transition it to not running) then launch a new worker. If not running, just launch a new worker.

If the producer wants to launch a new worker, it first makes sure that we are in the not running state (otherwise, logic error). We then transition to the Doing tasks state, and then we launch the worker thread.

If the producer wants to shut down the helper task, it sets the done flag. It then checks the worker state. If it is anything besides not running, it joins it.

This can result in a worker thread that is launched for no good reason.

There are a few cases where the above can block, but there where a few before as well.

Then, we write a formal or semi-formal proof that the above cannot lose messages, because when writing lock free code you aren't done until you have a proof.

Renfrew answered 9/6, 2014 at 14:7 Comment(2)
I don't understand why "Idle shutdown" is needed. Isn't "Shutdown" enough?Flaunt
@michele.bertasi It does not have to be shown externally, but there are two different code paths in the actual worker thread. After going to idle shutdown, you must then empty the queue (or risk losing), then go to shutdown. When done, however, you do not want to empty the queue, so you go strait to shutdown.Renfrew
F
0

This is my solution of the question. I don't like very much answering myself, but I think showing actual code may help others.

#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>
// I used this semaphore class: https://gist.github.com/yohhoy/2156481
#include "binsem.hpp"

using Queue =
    boost::lockfree::spsc_queue<
        int,
        boost::lockfree::capacity<1024>>;

class Worker
{
public:
    // the worker thread starts in the constructor
    Worker()
        : working_(ATOMIC_FLAG_INIT), done_(false), semaphore_(0)
        , worker_([this]{ work(); })
    { } 

    ~Worker() {
        // exit even if the work has not been completed
        done_ = true;
        semaphore_.signal();
        worker_.join();
    }

    bool enqueue(int value) {
        bool enqueued = queue_.push(value);
        if (!working_.test_and_set())
            // signal to the worker thread to wake up
            semaphore_.signal();
        return enqueued;
    }

    void work() {
        int value;
        // the worker thread continue to live
        while (!done_) {
            // wait the start signal, sleeping
            semaphore_.wait();
            while (!done_ && queue_.pop(value)) {
                // perform actual work
                std::cout << value << std::endl;
            }
            working_.clear();
            while (!done_ && queue_.pop(value)) {
                // perform actual work
                std::cout << value << std::endl;
            }
        }
    }

private:
    std::atomic_flag working_;
    std::atomic<bool> done_;
    binsem semaphore_;
    Queue queue_;
    std::thread worker_;
};

I tried the suggestion of @Cameron, to not shutdown the thread and adding a semaphore. This actually is used only in the first enqueue and in the last work. This is not lock-free, but only in these two cases.

I did some performance comparison, between my previous version (see my edited question), and this one. There are no significant differences, when there are not many start and stop. However, the enqueue is 10 times faster when it have to signal the worker thread, instead of starting a new thread. This is a rare case, so it is not very important, but anyway it is an improvement.

This implementation satisfies:

  • lock-free in the common case (when enqueue and work are busy);
  • no busy wait in case for long time there are not enqueue
  • the destructor exits as soon as possible
  • correctness?? :)
Flaunt answered 10/6, 2014 at 17:44 Comment(3)
Seems a little better to me, but it's definitely not correct: working_ could be true during enqueue (so the semaphore doesn't get set), but the worker thread could already be out of the first inner while loop (and just hasn't cleared working_ yet). Then it will wait on the semaphore forever (until the next enqueue). I suggest you use the semaphore exclusively -- although having just poked around, I don't see any nice ones that spin for a few cycles (lock-free) before blocking on an OS primitive...Zwolle
@Zwolle If the worker thread has not cleared working_, but another enqueue comes, it is not a problem, because after the clear, there is another loop. It ensures that those items are processed. In the meantime, if another enqueue comes, then the working_ flag is now 0, so the semaphore gets the post.Flaunt
Hmm, I missed that. Interesting. Why not just get rid of working_ altogether then? (Ah, I think I know why: the semaphore is heavyweight -- I really don't see a reason why the semaphore can't just be an atomic integer and a simple spin-wait that falls back to a heavier OS primitive only if necessary; I might write such an implementation soon, it seems that a lot of code could benefit from a lightweight semaphore).Zwolle
C
0

Very partial answer: I think all those atomics, semaphores and states are a back-communication channel, from "the thread" to "the Worker". Why not use another queue for that? At the very least, thinking about it will help you around the problem.

Casaba answered 21/8, 2019 at 11:50 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.