C++11 lockless queue using std::atomic (multi writer, single consumer)
Asked Answered
S

2

6

I've produced a simple implementation of the lockless (lockfree) queue using the new std::atomic in C++11. I can't see what I'm doing wrong here.

#include <atomic>

template<typename T>
class lockless_queue
{
public:
    template<typename DataType>
    struct node
    {
        node(const DataType& data)
          : data(data), next(nullptr) {}
        DataType data;
        node* next;
    };

    lockless_queue()
      : head_(nullptr) {}

    void produce(const T &data)
    {
        node<T>* new_node = new node<T>(data);
        // put the current value of head into new_node->next
        new_node->next = head_.load(std::memory_order_relaxed);
        // now make new_node the new head, but if the head
        // is no longer what's stored in new_node->next
        // (some other thread must have inserted a node just now)
        // then put that new head into new_node->next and try again
        while(!std::atomic_compare_exchange_weak_explicit(
            &head_,
            &new_node->next,
            new_node,
            std::memory_order_release,
            std::memory_order_relaxed)) {}
    }

    node<T>* consume_all()
    {
        // Reset queue and return head atomically
        return head_.exchange(nullptr, std::memory_order_consume);
    }
private:
    std::atomic<node<T>*> head_;
};

// main.cpp
#include <iostream>

int main()
{
    lockless_queue<int> s;
    s.produce(1);
    s.produce(2);
    s.produce(3);
    auto head = s.consume_all();
    while (head)
    {
        auto tmp = head->next;
        std::cout << tmp->data << std::endl;
        delete head;
        head = tmp;
    }
}

And my output:

2
1
Segmentation fault (core dumped)

Can I have some pointers where to look or an indication what I could be doing wrong?

Thanks!

Sophy answered 20/12, 2013 at 18:57 Comment(2)
Just in case you didn't know (not suggesting you should give up your endevour) but boost has a lock free queue. boost.org/doc/libs/1_53_0/doc/html/boost/lockfree/queue.htmlVaporizer
You are pushing on three items, then popping them off, sequentially, in one thread. Why can you not debug this? If you cannot fix it with one thread and no contention, I don't hold out much hope for multiple producers and a consumer.Padraic
B
3

You are dereferencing tmp instead of head:

while (head)
    {
        auto tmp = head->next;
        std::cout << tmp->data << std::endl;
        delete head;
        head = tmp;
    }

should be:

while (head)
    {
        std::cout << head->data << std::endl;
        auto tmp = head->next;
        delete head;
        head = tmp;
    }

This is why 3 doesn't appear in your output and Segmentation fault does.

Barcus answered 20/12, 2013 at 19:24 Comment(0)
I
2

You have another error in your code that won't show up until you start trying to perform concurrent enqueues. If your compare_exchange_weak_explicit fails, that implies that another thread managed to change the head pointer, and as such before you can try your CAS again, you need to re-load the new value of the head pointer into your new_node->next. The following will do the trick:

    while(!std::atomic_compare_exchange_weak_explicit(
        &head_,
        &new_node->next,
        new_node,
        std::memory_order_release,
        std::memory_order_relaxed)) {
        new_node->next = head_.load(std::memory_order_relaxed);
    }
Inexcusable answered 22/12, 2013 at 17:42 Comment(1)
Well actually not. The std::atomic_compare_exchange will update the expected value to the actual value if it fail. It's debatable whether this is desirable functionality. And you might even want to do it yourself anyway. But std::atomic_compare_exchange will do it for you.Touslesmois

© 2022 - 2024 — McMap. All rights reserved.