Looking for the right ring buffer implementation in C
Asked Answered
S

2

8

I am looking for a ring buffer implementation (or pseudocode) in C with the following characteristics:

  • multiple producer single consumer pattern (MPSC)
  • consumer blocks on empty
  • producers block on full
  • lock-free (I expect high contention)

So far I've been working only with SPSC buffers - one per producer - but I would like to avoid the continuous spinning of the consumer to check for new data over all its input buffers (and maybe to get rid of some marshaling threads in my system).

I develop for Linux on Intel machines.

Slink answered 4/9, 2012 at 13:5 Comment(9)
I don't know what enviroment you're in, but in Win32 you can use WaitForMultipleObjects to have consumer wait at all queues at once without spinning.Flack
I am sorry, I didn't mention that I mainly work on LinuxSlink
Just in case you won't get a real answer, it'll be a breeze to sync multiple buffers with this: neosmart.net/blog/2011/…Flack
What are you going to queue? Can we assume they're pointers? If you were using spinners before, I don't think the lock-free requirement is actually much of a requirement - performance would have been so bad before that locking is probably a non-issue. A futex lock while pushing/popping a pointer is not exactly a huge performance hit. Can you get away with unbounded queues, eg. by using a *buffer pool? There are lots of examples of unbounded P-C queues on the net, mostly using condvars to wait on.Bisutun
I always pass pointers. On one hand I would like to stop spinning to preserve CPU time, on the other I would like to be sure that locking is not a problem when moving from multiple SPSC buffers to a single MPSC one. So far, I've been pre-allocating resources at initialization time, so I do not think I can get away with unbounded structures.Slink
If you are pre-allocating resources, you CAN get away with unbounded queues - and it's easier. If you malloc/calloc 256 *buffers at startup, shove them onto a queue and use only those buffers for inter-thread comms, (returning 'used' *buffers back to the pool queue), you know for sure that any queue in the system cannot ever hold more than 256 pointers, so you can just use a *buffer[256] circular-indexed array for the storage in each queue, knowing that it can never overflow, (unless you screw up, of course, eg. by double-posting a pointer back to the pool:).Bisutun
I see what you mean. What would you use to synchronize the writers ? You talked about futex before, but it is my understanding that pthreads under Linux should be already taking advantage of them.Slink
Take a look at this article on Port Windows IPC apps to Linus (ibm.com/developerworks/linux/library/l-ipc2lin3/index.html) which describes how the different synchronization primitives compare between Windows and Linux. This is a multipart article. I would expect that some kind of locking will needed in order to ensure that only one process or thread at a time has access to the management data structures for the queue between producers and consumer.Ahvenanmaa
And here is an interesting article on Dr. Dobb's about "Use Critical Sections (Preferably Locks) to Eliminate Races" drdobbs.com/cpp/use-critical-sections-preferably-locks-t/…Ahvenanmaa
M
3

I think I have what you are looking for. It is a lock free ring buffer implementation that blocks producer/consumer. You only need access to atomic primitives - in this example I will use gcc's sync functions.

It has a known bug - if you overflow the buffer by more than 100% it is not guaranteed that the queue remains FIFO (it will still process them all eventually).

This implementation relies on reading/writing the buffer elements as being an atomic operation (which is pretty much guaranteed for pointers)

struct ringBuffer
{
   void** buffer;
   uint64_t writePosition;
   size_t size;
   sem_t* semaphore;
}

//create the ring buffer
struct ringBuffer* buf = calloc(1, sizeof(struct ringBuffer));
buf->buffer = calloc(bufferSize, sizeof(void*));
buf->size = bufferSize;
buf->semaphore = malloc(sizeof(sem_t));
sem_init(buf->semaphore, 0, 0);

//producer
void addToBuffer(void* newValue, struct ringBuffer* buf)
{
   uint64_t writepos = __sync_fetch_and_add(&buf->writePosition, 1) % buf->size;

   //spin lock until buffer space available
   while(!__sync_bool_compare_and_swap(&(buf->buffer[writePosition]), NULL, newValue));
   sem_post(buf->semaphore);
}    

//consumer
void processBuffer(struct ringBuffer* buf)
{
   uint64_t readPos = 0;
   while(1)
   {
       sem_wait(buf->semaphore);

       //process buf->buffer[readPos % buf->size]
       buf->buffer[readPos % buf->size] = NULL;
       readPos++;
   }
}
Multifoliate answered 5/9, 2012 at 5:47 Comment(6)
This example is interesting. Let me see if I understood well: the index increment and the write are lock-free, while you use a lock in the form a semaphore to block the consumer when there is nothing to consume. I don't understand how it is possible to overflow this buffer, though. Moreover, did you use this structure in a system where you expected a very short spinning time ? How was the impact of the spinning loop ?Slink
You can overflow the buffer by writing in data faster than it can be processed. Eventually the write index will come around and "pass" the reader. At this point, the writer has to wait in a spin lock while it waits for the reader to catch up (otherwise it would be overwriting data in the buffer). The bug occurs if you overflow the queue by more than 100%. In this scenario, you have more than 1 thread waiting in a spinlock for the buffer to become available. It is not guaranteed which of the threads will write to the queue first.Multifoliate
Wouldn't be simpler to rewrite the above loop as in the following ? while(1) { while(__sync_bool_compare_and_swap(&(buf->buffer[writePosition]), NULL, newValue) == false); sem_post(buf->semaphore); break; }Slink
Yeah. I guess. I don't even use a spinlock in my code - if I overflow the buffer I just throw up a lock over everything, and increase the size of the buffer (by a factor of 10).Multifoliate
Actually with that method you don't even need the outer while(1) loop.Multifoliate
As for spinning performance - it shouldn't be an issue. The idea is that your buffer is large enough so that it is never full, and thus you will never spin. The spin lock is only a safety so that overflowing the buffer does not cause undefined behaviour.Multifoliate
A
4

See liblfds, a lock-free MPMC ringbuffer. It won't block at all—lock-free data structures don't tend to do this, because the point of being lock-free is to avoid blocking; you need to handle this, when the data structure comes back to you with a NULL—returns NULL if you try to read on empty, but doesn't match your requirement when writing on full; here, it will throw away the oldest element and give you that for your write.

However, it would only take a small modification to obtain that behaviour.

But there may be a better solution. The tricky part of a ringbuffer is when full getting the oldest previous element and re-using that. You don't need this. I think you could take the SPSC memory-barrier only circular buffer and rewrite it using atomic operations. That will be a lot more performant that the MPMC ringbuffer in liblfds (which is a combination of a queue and a stack).

Advertence answered 7/9, 2012 at 7:39 Comment(2)
So far, my SPSC implementation is fairly trivial: it relies only on local and global position counters to synchronize the writer and the reader (the local counters are there for batching the push/pull of elements and reduce false sharing). Condition variables are in place to reduce spinning (if no data is available there is nothing else to do/if destination is full backpressure is unavoidable). Without proper memory barriers, my implementation won't work on another architecture. Could you please detail about your last point ? In the end, the ring buffer will be always SPSC, right ?Slink
There is a well-known SPSC circular buffer, used for example in the linux kernel, which uses memory barriers only, returns NULL when the buffer is full or empty. I'm guessing it could be made MPMC by using atomic operations.Advertence
M
3

I think I have what you are looking for. It is a lock free ring buffer implementation that blocks producer/consumer. You only need access to atomic primitives - in this example I will use gcc's sync functions.

It has a known bug - if you overflow the buffer by more than 100% it is not guaranteed that the queue remains FIFO (it will still process them all eventually).

This implementation relies on reading/writing the buffer elements as being an atomic operation (which is pretty much guaranteed for pointers)

struct ringBuffer
{
   void** buffer;
   uint64_t writePosition;
   size_t size;
   sem_t* semaphore;
}

//create the ring buffer
struct ringBuffer* buf = calloc(1, sizeof(struct ringBuffer));
buf->buffer = calloc(bufferSize, sizeof(void*));
buf->size = bufferSize;
buf->semaphore = malloc(sizeof(sem_t));
sem_init(buf->semaphore, 0, 0);

//producer
void addToBuffer(void* newValue, struct ringBuffer* buf)
{
   uint64_t writepos = __sync_fetch_and_add(&buf->writePosition, 1) % buf->size;

   //spin lock until buffer space available
   while(!__sync_bool_compare_and_swap(&(buf->buffer[writePosition]), NULL, newValue));
   sem_post(buf->semaphore);
}    

//consumer
void processBuffer(struct ringBuffer* buf)
{
   uint64_t readPos = 0;
   while(1)
   {
       sem_wait(buf->semaphore);

       //process buf->buffer[readPos % buf->size]
       buf->buffer[readPos % buf->size] = NULL;
       readPos++;
   }
}
Multifoliate answered 5/9, 2012 at 5:47 Comment(6)
This example is interesting. Let me see if I understood well: the index increment and the write are lock-free, while you use a lock in the form a semaphore to block the consumer when there is nothing to consume. I don't understand how it is possible to overflow this buffer, though. Moreover, did you use this structure in a system where you expected a very short spinning time ? How was the impact of the spinning loop ?Slink
You can overflow the buffer by writing in data faster than it can be processed. Eventually the write index will come around and "pass" the reader. At this point, the writer has to wait in a spin lock while it waits for the reader to catch up (otherwise it would be overwriting data in the buffer). The bug occurs if you overflow the queue by more than 100%. In this scenario, you have more than 1 thread waiting in a spinlock for the buffer to become available. It is not guaranteed which of the threads will write to the queue first.Multifoliate
Wouldn't be simpler to rewrite the above loop as in the following ? while(1) { while(__sync_bool_compare_and_swap(&(buf->buffer[writePosition]), NULL, newValue) == false); sem_post(buf->semaphore); break; }Slink
Yeah. I guess. I don't even use a spinlock in my code - if I overflow the buffer I just throw up a lock over everything, and increase the size of the buffer (by a factor of 10).Multifoliate
Actually with that method you don't even need the outer while(1) loop.Multifoliate
As for spinning performance - it shouldn't be an issue. The idea is that your buffer is large enough so that it is never full, and thus you will never spin. The spin lock is only a safety so that overflowing the buffer does not cause undefined behaviour.Multifoliate

© 2022 - 2024 — McMap. All rights reserved.