Lock-Free Multiple Producer/Consumer Queue in C++11
Asked Answered
S

4

18

I'm trying to implement a lock free multiple producer, multiple consumer queue in C++11. I'm doing this as a learning exercise, so I'm well aware that I could just use an existing open source implementation, but I'd really like to find out why my code doesn't work. The data is stored in a ringbuffer, apparently it is a "bounded MPMC queue".

I've modelled it pretty closely to what I've read of Disruptor. The thing I've noticed is that it works absolutely fine with a single consumer and single/multiple producers, it's just multiple consumers which seems to break it.

Here's the queue:

    template <typename T>
class Queue : public IQueue<T>
{
public:
    explicit Queue( int capacity );
    ~Queue();

    bool try_push( T value );
    bool try_pop( T& value );
private:
    typedef struct
    {
        bool readable;
        T value;
    } Item;

    std::atomic<int> m_head;
    std::atomic<int> m_tail;
    int m_capacity;
    Item* m_items;
};

template <typename T>
Queue<T>::Queue( int capacity ) :
m_head( 0 ),
m_tail( 0 ),
m_capacity(capacity),
m_items( new Item[capacity] )
{
    for( int i = 0; i < capacity; ++i )
    {
        m_items[i].readable = false;
    }
}

template <typename T>
Queue<T>::~Queue()
{
    delete[] m_items;
}

template <typename T>
bool Queue<T>::try_push( T value )
{
    while( true )
    {
        // See that there's room
        int tail = m_tail.load(std::memory_order_acquire);
        int new_tail = ( tail + 1 );
        int head = m_head.load(std::memory_order_acquire);

        if( ( new_tail - head ) >= m_capacity )
        {
            return false;
        }

        if( m_tail.compare_exchange_weak( tail, new_tail, std::memory_order_acq_rel ) )
        {
            // In try_pop, m_head is incremented before the reading of the value has completed,
            // so though we've acquired this slot, a consumer thread may be in the middle of reading
            tail %= m_capacity;

            std::atomic_thread_fence( std::memory_order_acquire );
            while( m_items[tail].readable )
            {
            }

            m_items[tail].value = value;
            std::atomic_thread_fence( std::memory_order_release );
            m_items[tail].readable = true;

            return true;
        }
    }
}

template <typename T>
bool Queue<T>::try_pop( T& value )
{
    while( true )
    {
        int head = m_head.load(std::memory_order_acquire);
        int tail = m_tail.load(std::memory_order_acquire);

        if( head == tail )
        {
            return false;
        }

        int new_head = ( head + 1 );

        if( m_head.compare_exchange_weak( head, new_head, std::memory_order_acq_rel ) )
        {
            head %= m_capacity;

            std::atomic_thread_fence( std::memory_order_acquire );
            while( !m_items[head].readable )
            {
            }

            value = m_items[head].value;
            std::atomic_thread_fence( std::memory_order_release );
            m_items[head].readable = false;

            return true;
        }
    }
}

And here's the test I'm using:

void Test( std::string name, Queue<int>& queue )
{
    const int NUM_PRODUCERS = 64;
    const int NUM_CONSUMERS = 2;
    const int NUM_ITERATIONS = 512;
    bool table[NUM_PRODUCERS*NUM_ITERATIONS];
    memset(table, 0, NUM_PRODUCERS*NUM_ITERATIONS*sizeof(bool));

    std::vector<std::thread> threads(NUM_PRODUCERS+NUM_CONSUMERS);

    std::chrono::system_clock::time_point start, end;
    start = std::chrono::system_clock::now();

    std::atomic<int> pop_count (NUM_PRODUCERS * NUM_ITERATIONS);
    std::atomic<int> push_count (0);

    for( int thread_id = 0; thread_id < NUM_PRODUCERS; ++thread_id )
    {
        threads[thread_id] = std::thread([&queue,thread_id,&push_count]()
                                 {
                                     int base = thread_id * NUM_ITERATIONS;

                                     for( int i = 0; i < NUM_ITERATIONS; ++i )
                                     {
                                         while( !queue.try_push( base + i ) ){};
                                         push_count.fetch_add(1);
                                     }
                                 });
    }

    for( int thread_id = 0; thread_id < ( NUM_CONSUMERS ); ++thread_id )
    {
        threads[thread_id+NUM_PRODUCERS] = std::thread([&]()
                                         {
                                             int v;

                                             while( pop_count.load() > 0 )
                                             {
                                                 if( queue.try_pop( v ) )
                                                 {
                                                     if( table[v] )
                                                     {
                                                         std::cout << v << " already set" << std::endl;
                                                     }
                                                     table[v] = true;
                                                     pop_count.fetch_sub(1);
                                                 }
                                             }
                                         });

    }

    for( int i = 0; i < ( NUM_PRODUCERS + NUM_CONSUMERS ); ++i )
    {
        threads[i].join();
    }

    end = std::chrono::system_clock::now();
    std::chrono::duration<double> duration = end - start;

    std::cout << name << " " << duration.count() << std::endl;

    std::atomic_thread_fence( std::memory_order_acq_rel );

    bool result = true;
    for( int i = 0; i < NUM_PRODUCERS * NUM_ITERATIONS; ++i )
    {
        if( !table[i] )
        {
            std::cout << "failed at " << i << std::endl;
            result = false;
        }
    }
    std::cout << name << " " << ( result? "success" : "fail" ) << std::endl;
}

Any nudging in the right direction would be greatly appreciated. I'm pretty new to memory fences rather than just using a mutex for everything, so I'm probably just fundamentally misunderstanding something.

Cheers J

Stereotypy answered 7/9, 2014 at 11:8 Comment(19)
You should add to your description that you're building a bounded MPMC queue. That's a pretty important aspect.Infallibilism
I'd never heard that term before, thanks =)Stereotypy
I don't like the asymmetry in thread fence acquire/release. You sure that is correct?Singleness
@LumpN which asymmetry do you mean?Stereotypy
@LumpN ooh, I think I know the bit you mean, I've just updated the code (it still isn't working though). Let me know if the asymmetry you saw is still there.Stereotypy
typedef struct is a rare sight in C++, since there's no need for it. Simply struct Item is fine.Halliard
You should probably delete or implement the copy/move ctors and assignment-operators. As far as I can see from a first, quick glance, the compiler-provided default implementations won't work correctly. Edit: Oh, well, you're using a raw owning pointer data member. So that will break it definitely.Halliard
I recommend putting your code on Code Review once it's working. There are several points where I think your code can be improved.Halliard
o.O your queue only supports 2 billion push operations (on typical PC C++ implementations) before producing undefined behaviour? That's interesting..Halliard
while( m_items[tail].readable ) is very suspicious. Since setting it can be postponed arbitrarily, and it's written to only after the value is set, you could accidentally overwrite a value.Halliard
Thanks for those @dyp. Are the copy/move ctors a reason why it isn't working in this case? As for supporting 2 billion push operations, I'm aware of this, I'd just like to get it working first! Could you expand on your comments on while( m_items[tail].readable ), it sounds like this could be the root of my problem, but I'd like to fully understand what you mean rather than just implementing a fix without understanding how it works. Thanks for the heads up on typedef struct, I had no idea! Thanks again for replying =)Stereotypy
I think there's also a conceptual problem: The compare_exchange basically tells everyone that there's another element to be read from (in push) or that has been read from => more capacity (in pop). So let's assume you're trying to push, but there's only one element capacity left and that last element hasn't yet been read from (popping in progress). [to be continued]Halliard
... Then the producer announces that there's another element to be read from by incrementing m_tail and waits until popping has finished (that's the intent, at least). Another consumer however now can start popping the very same element, since it's there (the producer says so) and it can be read from (since the other consumer has not cleared the flag yet).Halliard
To quote Yakk: "The proper way to approach a lock free data structure is to write a semi formal proof that your design works in pseudo code. You shouldn't be asking "is this lock free code thread safe", but rather "does my proof that this lock free code is thread safe have any errors?"" These things are notoriously difficult to get right. I suggest looking at an existing implementation to get some ideas how difficult it actually is (caveats etc.) and then start proofing.Halliard
Ah ha, yes, I'm just drawing some boxes on paper to visualise this, and I think I understand what you're saying. Given that the only thing used to determine if there is space is m_tail and m_head, double-reads and double-writes seem to both be possible from what I can tell. I know disruptor handles writes by having two heads, one for slots that have been claimed by a producer, and one for producers who have finished writing. The downside is that the producer at slot 3 has to wait for the producer at slot 2 to finish before they can update the second head and return from the push operation.Stereotypy
@Halliard did you want to submit a summary as an answer so I can mark it as correct? I can do it myself if you'd rather, but thought you might want the rep. Thanks again =)Stereotypy
No, I don't need the rep :) and I'm reluctant to write an answer here; given that I (think that I) didn't really pointed you in a direction of you you can get that code to work, only pointed out some (possible) problems.Halliard
Cool, well when I have it working I'll polish it up as best I can and then submit to code review as you suggested. I found another great problem, I read head and then tail, and then check for equality. But of course, tail could have advanced PAST head in-between reads!Stereotypy
related: Lock-free Progress Guarantees has some analysis of the MPMC circular buffer queue implementation in liblfds. It's lockless, but not lock-free in a computer-science sense. But in practice it's very good and doesn't have much contention between producers and consumers, or between different producers or different consumers.Longer
M
14

I'd give a look to Moody Camel's implementation.

It is a fast general purpose lock-free queue for C++ entirely written in C++11. Documentation seems to be rather good along with a few performance tests.

Among all other interesting things (they're worth a read anyway), it's all contained in a single header, and available under the simplified BSD license. Just drop it in your project and enjoy!

Miasma answered 31/10, 2015 at 17:30 Comment(1)
The second header file is for the blocking version.Miasma
N
3

The simplest approach uses a circular buffer. That is it's like an array of 256 elements and you use uint8_t as index so it wraps around and starts at beginning when you overflow it.

The simplest primitive you can build upon is when you have single producer, single consumer thread.

The buffer has two heads:

  • Write head: It points the element which will be written next.
  • Read head: It points to the element which will be read next.

Operation of the producer:

  1. If write Head + 1 == read head, the buffer is full, return buffer full error.
  2. Write content to the element.
  3. Insert memory barrier to sync CPU cores.
  4. Move the write head forward.

At the buffer full case there is still 1 room left, but we reserve that, to distinguish from the buffer empty case.

Operation of the consumer:

  1. If read head == write head, the buffer is empty, return buffer empty error.
  2. Read content of the element.
  3. Insert memory barrier to sync CPU cores.
  4. Move the read head forward.

The producer owns the write head, the consumer owns the read head, there is no concurrency on those. Also the heads are updated when the operation is completed, this ensure the consumer leaves finished elements behind, and the consumes leaves behind fully consumed empty cells.

Create 2 of these pipes in both directions whenever you fork off a new thread and you can have bidirectional communication with your threads.

Given that we are talking about lock freeness it also means none of the threads are blocked, when there is nothing to do the threads are spinning empty, you may want to detect this and add some sleep when it happens.

Namecalling answered 10/5, 2018 at 16:38 Comment(1)
Indeed creating a pair of spsc queues between each pair of threads who need to communicate is the easiest, and often more performant than a single mpmc queue when under high contention. There are reasons you'd need multi-consumer at least, such as job stealing. This was my writeup of this whole rabbit hole if you're at all interested: codersblock.org/blog/2016/6/02/ditching-the-mutexStereotypy
P
0

How about this lock free queue

It is memory ordering lock free queue, but this need to pre-set number of current thread when init the queue.

For example:-

int* ret;
int max_concurrent_thread = 16;
lfqueue_t my_queue;

lfqueue_init(&my_queue, max_concurrent_thread );

/** Wrap This scope in other threads **/
int_data = (int*) malloc(sizeof(int));
assert(int_data != NULL);
*int_data = i++;
/*Enqueue*/
 while (lfqueue_enq(&my_queue, int_data) == -1) {
    printf("ENQ Full ?\n");
}

/** Wrap This scope in other threads **/
/*Dequeue*/
while  ( (int_data = lfqueue_deq(&my_queue)) == NULL) {
    printf("DEQ EMPTY ..\n");
}

// printf("%d\n", *(int*) ret );
free(ret);
/** End **/

lfqueue_destroy(&my_queue);
Parliamentary answered 30/8, 2018 at 11:16 Comment(0)
S
0

On another similar question, I presented a solution to this problem. I believe that it the smallest found so far.

I will not put same answer here, but the repository has a fully functional C++ implementation of the lock free queue you desire.

EDIT: Thanks to code review from @PeterCordes, I've found a bug on the solution when using 64 bit templates, but now it's working perfectly.

This is the output I receive when running the tests

Creating 4 producers & 4 consumers
to flow 10.000.000 items trough the queue.

Produced: 10.743.668.245.000.000
Consumed: 5.554.289.678.184.004
Produced: 10.743.668.245.000.000
Consumed: 15.217.833.969.059.643
Produced: 10.743.668.245.000.000
Consumed: 7.380.542.769.600.801
Produced: 10.743.668.245.000.000
Consumed: 14.822.006.563.155.552

Checksum: 0 (it must be zero)
Superabundant answered 7/1, 2020 at 5:35 Comment(15)
Much like my implementation here codersblock.org/blog/2016/6/02/ditching-the-mutex your solution isn't truly lock-free. If you suspended producer threads between acquiring their slot and writing the item, or consumer threads between reading their item and clearing their slot, then it will lock up the data structure.Stereotypy
I think it's technically not safe to have concurrent writers to a std::vector, and even if you switch to a std::array or T[], then if T can't be written atomically then you have a data race there. If T can be written atomically, you could still get a torn read (data race) if the items are not naturally aligned. Other than that it looks ok, I think because you're using seq_cst memory ordering you've saved yourself some headaches with observing the correct side effects. I'd recommend testing it with Relacy though when you think it's safe.Stereotypy
@Joe: Most lockless queues aren't technically lock-free; but in practice perform well. Lock-free Progress Guarantees. You can use atomics to let one thread "claim" a slot in a std::vector. But you're right, this queue doesn't do that, which is a bug. It doesn't distinguish between claimed and finished-writing states, so you could have a writer racing with a reader. There's no atomic operation after buffer[t & mask] = item in github.com/bittnkr/uniq/blob/master/cpp/uniq.h. Testing with sizeof(T)=128 or something should reveal tearing.Longer
@PeterCordes indeed, I agree, most aren't, and true lock-free-ness isn't necessarily better in a real world use-case. Further to the std::vector - I know in reality you're certainly just writing to an array, but I think the standard says you shouldn't have concurrent writers to a std::vector. Anywho, I've personally come a loooong way since I wrote this post, and don't advocate using a multi producer multi consumer queue (though it's a fun exercise). These days I find the best performance in single producer single consumer queues between thread pairs.Stereotypy
@Joe: Each element of a std::vector or array is a different object. It's not a data race have have different threads writing to different elements. Something like push_back that could invalidate iterators (i.e. realloc the array) is not safe to do while there are readers or writers on other elements. The control block is not thread-safe either, so even changing the size (within the already-allocated capacity) in one thread would be a data race with any other thread doing basically anything. But two threads reading the control block as part of operator[] is fine.Longer
@Joe: and yes, contention is a killer, so one SPSC queue per thread could work much better, even if that means one thread iterates over multiple queues trying to consume from them. Or scatters its output into multiple queues.Longer
@PeterCordes ah yes you're right: "[ Note: For a vector<int> x with a size greater than one, x[1] = 5 and *x.begin() = 10 can be executed concurrently without a data race, but x[0] = 5 and *x.begin() = 10 executed concurrently may result in a data race. As an exception to the general rule, for a vector<bool> y, y[0] = true may race with y[1] = true. — end note ]"Stereotypy
@Joe: yup. But unfortunately this queue does appear to have a race on individual elements between writers and readers. (And vice versa, with pop doing buffer[t] = 0; which could step on the data from a push.) And the lack of synchronization would let pop read a claimed but not-yet-written entry, if it goes even earlier than what you pointed out about tearing.Longer
Hi buddies, Thanks for the comments. I ask you please, run the code, and follow the logic, to understand why you can safely use std::vector or any kind of vector after the atomic registers are updated. This atomic registers are the turnstile which safely reserves the seat on the vector to the thread who update it. I don't have much space here but I can assure that there is no race conditions there. This is the definitive solution to race conditions. The only moment that the threads are suspended is when is full or empty. Please stress it, increasing threads and reducing the buffer.Superabundant
@PeterCordes: I know that the simplicity of the code appear that there are race conditions, but I can ensure you that there are not. That's the beauty of the solution, its simplicity and solidity. Try reducing the buffer to a single position (the worst case of any queue) and note how it performs the same way.Superabundant
Ok, in that case, this sequence of events leads to a race: a writer calls push. It sleeps or stalls after head.compare_exchange_weak(t, t + 1) succeeds, making head=1 globally visible, but before buffer[0] = item happens. Another thread running pop can then see tail != head and read !buffer[0] at the same time another thread is writing it. This is a non-atomic object, so for example with int64_t on a 32-bit machine you could get tearing in practice. (As well as it obviously being ISO C++ UB.)Longer
I hadn't previously noticed that pop spins on the element value; for that to be safe you should make it a vector of std::atomic<T> that you access with memory_order_relaxed. You're already depending on loads/stores to the vector being atomic. !buffer[t & mask] could in theory be hoisted out of the loop when it's the same index every time, but in practice compilers will just reload it because t is reloaded from tail inside the loop. And also updated by reference by tail.compare_exchange_weak(t, t+1); you should hoist the t = tail out of the loop and just use the CAS-fail load.Longer
@Joe: TL:DR: this does have a C++ data race, but it will usually work in practice for T that happens to be atomic in asm, e.g. an int on most machines. It's not even usable for non-numeric types because of the =0, unless you have operator overloads on a class type. I think it can also get stuck if a buffer[t] = 0; steps on a push, then pop will spin-wait forever when it comes back around and tries to read from that element.Longer
@PeterCordes: You are right about the template and buffer[t] = 0;. Instead of using the templated data buffer to check if it is free, this code can be replaced by a second array of booleans that keep a record of free seats. I did this way for simplicity.. About your another concern: If the thread is suspended after head.compare_exchange_weak(t, t + 1) the seat is already reserved, no other thread can get it, so the thread can safely use the space kept in the variable t after returning. Please run the code, and try to break it to proof the race condition.Superabundant
Let us continue this discussion in chat.Superabundant

© 2022 - 2024 — McMap. All rights reserved.