I'm trying to implement a lock free multiple producer, multiple consumer queue in C++11. I'm doing this as a learning exercise, so I'm well aware that I could just use an existing open source implementation, but I'd really like to find out why my code doesn't work. The data is stored in a ringbuffer, apparently it is a "bounded MPMC queue".
I've modelled it pretty closely to what I've read of Disruptor. The thing I've noticed is that it works absolutely fine with a single consumer and single/multiple producers, it's just multiple consumers which seems to break it.
Here's the queue:
template <typename T>
class Queue : public IQueue<T>
{
public:
explicit Queue( int capacity );
~Queue();
bool try_push( T value );
bool try_pop( T& value );
private:
typedef struct
{
bool readable;
T value;
} Item;
std::atomic<int> m_head;
std::atomic<int> m_tail;
int m_capacity;
Item* m_items;
};
template <typename T>
Queue<T>::Queue( int capacity ) :
m_head( 0 ),
m_tail( 0 ),
m_capacity(capacity),
m_items( new Item[capacity] )
{
for( int i = 0; i < capacity; ++i )
{
m_items[i].readable = false;
}
}
template <typename T>
Queue<T>::~Queue()
{
delete[] m_items;
}
template <typename T>
bool Queue<T>::try_push( T value )
{
while( true )
{
// See that there's room
int tail = m_tail.load(std::memory_order_acquire);
int new_tail = ( tail + 1 );
int head = m_head.load(std::memory_order_acquire);
if( ( new_tail - head ) >= m_capacity )
{
return false;
}
if( m_tail.compare_exchange_weak( tail, new_tail, std::memory_order_acq_rel ) )
{
// In try_pop, m_head is incremented before the reading of the value has completed,
// so though we've acquired this slot, a consumer thread may be in the middle of reading
tail %= m_capacity;
std::atomic_thread_fence( std::memory_order_acquire );
while( m_items[tail].readable )
{
}
m_items[tail].value = value;
std::atomic_thread_fence( std::memory_order_release );
m_items[tail].readable = true;
return true;
}
}
}
template <typename T>
bool Queue<T>::try_pop( T& value )
{
while( true )
{
int head = m_head.load(std::memory_order_acquire);
int tail = m_tail.load(std::memory_order_acquire);
if( head == tail )
{
return false;
}
int new_head = ( head + 1 );
if( m_head.compare_exchange_weak( head, new_head, std::memory_order_acq_rel ) )
{
head %= m_capacity;
std::atomic_thread_fence( std::memory_order_acquire );
while( !m_items[head].readable )
{
}
value = m_items[head].value;
std::atomic_thread_fence( std::memory_order_release );
m_items[head].readable = false;
return true;
}
}
}
And here's the test I'm using:
void Test( std::string name, Queue<int>& queue )
{
const int NUM_PRODUCERS = 64;
const int NUM_CONSUMERS = 2;
const int NUM_ITERATIONS = 512;
bool table[NUM_PRODUCERS*NUM_ITERATIONS];
memset(table, 0, NUM_PRODUCERS*NUM_ITERATIONS*sizeof(bool));
std::vector<std::thread> threads(NUM_PRODUCERS+NUM_CONSUMERS);
std::chrono::system_clock::time_point start, end;
start = std::chrono::system_clock::now();
std::atomic<int> pop_count (NUM_PRODUCERS * NUM_ITERATIONS);
std::atomic<int> push_count (0);
for( int thread_id = 0; thread_id < NUM_PRODUCERS; ++thread_id )
{
threads[thread_id] = std::thread([&queue,thread_id,&push_count]()
{
int base = thread_id * NUM_ITERATIONS;
for( int i = 0; i < NUM_ITERATIONS; ++i )
{
while( !queue.try_push( base + i ) ){};
push_count.fetch_add(1);
}
});
}
for( int thread_id = 0; thread_id < ( NUM_CONSUMERS ); ++thread_id )
{
threads[thread_id+NUM_PRODUCERS] = std::thread([&]()
{
int v;
while( pop_count.load() > 0 )
{
if( queue.try_pop( v ) )
{
if( table[v] )
{
std::cout << v << " already set" << std::endl;
}
table[v] = true;
pop_count.fetch_sub(1);
}
}
});
}
for( int i = 0; i < ( NUM_PRODUCERS + NUM_CONSUMERS ); ++i )
{
threads[i].join();
}
end = std::chrono::system_clock::now();
std::chrono::duration<double> duration = end - start;
std::cout << name << " " << duration.count() << std::endl;
std::atomic_thread_fence( std::memory_order_acq_rel );
bool result = true;
for( int i = 0; i < NUM_PRODUCERS * NUM_ITERATIONS; ++i )
{
if( !table[i] )
{
std::cout << "failed at " << i << std::endl;
result = false;
}
}
std::cout << name << " " << ( result? "success" : "fail" ) << std::endl;
}
Any nudging in the right direction would be greatly appreciated. I'm pretty new to memory fences rather than just using a mutex for everything, so I'm probably just fundamentally misunderstanding something.
Cheers J
typedef struct
is a rare sight in C++, since there's no need for it. Simplystruct Item
is fine. – Halliardwhile( m_items[tail].readable )
is very suspicious. Since setting it can be postponed arbitrarily, and it's written to only after the value is set, you could accidentally overwrite a value. – Halliardwhile( m_items[tail].readable )
, it sounds like this could be the root of my problem, but I'd like to fully understand what you mean rather than just implementing a fix without understanding how it works. Thanks for the heads up on typedef struct, I had no idea! Thanks again for replying =) – Stereotypycompare_exchange
basically tells everyone that there's another element to be read from (inpush
) or that has been read from => more capacity (inpop
). So let's assume you're trying to push, but there's only one element capacity left and that last element hasn't yet been read from (popping in progress). [to be continued] – Halliardm_tail
and waits until popping has finished (that's the intent, at least). Another consumer however now can start popping the very same element, since it's there (the producer says so) and it can be read from (since the other consumer has not cleared the flag yet). – Halliard