C /C++ Lock-free (or nonblocking) Ring Buffer that OVERWRITES oldest data?
Asked Answered
R

3

9

I'm trying to find a way to make a Lock Free OR Non-blocking way to make a Ring Buffer for single consumer / single consumer that will over-write the oldest data int the buffer. I've read a lot of lock-free algorithms that work when you "return false" if the buffer is full--ie, don't add; but I can't find even pseudo-code that talks about how to do it when you need to overwrite the oldest data.

I am using GCC 4.1.2 (restriction at work, i can't upgrade the version...) and I have the Boost libraries, and in the past I made my own Atomic< T > variable type that follows pretty closely to the upcomming specification (its not perfect, but it is thread-safe and does what i need).

When I thought about it, I figured using these atomics should really take care of the problem. some rough psuedo-code as to what i was thinking:

template< typename T , unsigned int Size>
class RingBuffer {
private:
Atomic<unsigned int> readIndex;
Atomic<unsigned int> writeIndex;
enum Capacity { size = Size };
T* buf;

unsigned int getNextIndex(unsigned int i)
{
 return (i + 1 ) % size;
}

public:
RingBuffer() { //create array of size, set readIndex = writeIndex = 0 }
~RingBuffer() { //delete data }
void produce(const T& t)
{
 if(writeIndex == getNextIndex(readIndex)) //1
 {
  readIndex = getNextIndex(readIndex); //2
  }
  buf[writeIndex] = t;
  writeIndex = getNextIndex(writeIndex);  //3
}

bool consume(T& t)
{
  if(readIndex == writeIndex)  //4
   return false;
  t = buf[readIndex];  
  readIndex = getNexIndex(readIndex);  //5
  return true;
}

};

As far as I can tell, there is no deadlock situations here, so we're safe from that (If my implementation above is wrong even on its pseudo-code leve, constructive criticism is always appreciated). However,the BIG race condition I can find is:

lets assume the buffer is full. that is, writeIndex +1 = readIndex; (1) occurs, just as consume is being called. and is true (4) is false, so we move to read from the buffer (5) occurs, and the readIndex is advanced one (so there is, in fact, space in the buffer (2) occurs, advancing readIndex AGAIN, thus LOSING the value.

Basically, its a classic problem of the writter must modify the reader, causing a race condition. Without actually blocking the entire list everytime I access it, I can't think of a way to prevent this from happening. What am I missing??

Reify answered 10/12, 2010 at 4:42 Comment(3)
I don't think a lock free algorithm for this exists. The reason lock free producer/consumer patterns work is because the producers and consumers need never touch the same data. But what you propose intimately ties the two together which is going to require some form of lock.Regan
You definitely can't deadlock: you don't have any locks. :-D That said, I think a lock-free algorithm for this would be difficult at best. You need modifications to both of the indices to be atomic together, so you'd really need an atomic<pair<int, int>>. You also need to be able to publish and consume the element data atomically which is impossible to do lock-free in the general case and even for specific cases, it's impossible to modify both pointers and an element's data all together atomically without some kind of lock. Your algorithm has numerous race conditions.Collectanea
Thanks for the comments guys. Yeah, looks like my intuition was right...trying to do a non-blocking version of it isn't going to work very easily.Reify
P
7
  1. Start with a single producer/multiple consumer queue with appropriate progress guarantees.
  2. If the queue is full and the push would fail, pop one value. Then there will be space to push the new value.
Phosphatase answered 10/12, 2010 at 5:18 Comment(4)
interesting, however I see a potential race condition in the pop/push bit --> you need to loop until the push succeeds in case another thread pushes after you popped. Very clever solution anyway!Auroora
Thanks, this really helped me think it through in a much cleaner way. Thanks a lot to all who posted!Reify
@Matthieu: That would be a concern in the multiple-producer case, however this is a single producer so no other thread can push.Phosphatase
right, I missed that. I would still use the loop though, just so that the container may be reused in a multiple-producer context.Auroora
O
1

What am I missing??

Lots:

  • say you consume a t while it's being overwritten by the producer - how're you detecting/handling that?
    • many options - e.g. do { copy the value out; check copy has integrity using modification sequence num etc. } while (corrupt)
  • using atomic numbers isn't enough - you also need to use CAS-style loops to affect the index increments (though I do assume you know that, given you say you've read extensively on this already)
  • memory barriers

But, let's write that off as being below your pseudo-code level, and consider your explicit question:

  • point (5) will actually require a CAS operation. If the readIndex was correctly sampled/copied atop consume() - before the (possibly corrupt) t was copied - then the CAS instruction will fail if it's already been incremented by the producer. Instead of the usual resample and retry CAS, just continue.
Oath answered 10/12, 2010 at 5:12 Comment(3)
Hi Tony, apologies for the bad code /pseudo code. Just to make sure, wouldn't i need a CAS operation for (2) as well?Reify
Also, for the over-writing condition, I believe that I can do away with that problem if the buffer was actually filled with Atomic < T > instead of T itself. since access would be guarded that should solve the issue?Reify
@J-16 SDiZ: in the code on of the lines is designated "// 5". @Nik: Yes - the first few issues I mention are systemic, and not limited to point (5). CAS should be used in the implementation of getNextIndex, and at point 5 where getNextIndex isn't appropriate as explained above. Even if T is atomic, how will you know if you're reading an old one or a new one? You need to think about that harder. It's also a point where you have to be very careful with your memory barriers. I've given some hints, but the exact answer can be tuned for your data type, or made generic but slower.Oath
C
0

Here is a code of circular buffer on atomic variables I have recently created. I have modified it to "override" data instead of returning false. Disclaimer - it is not production grade tested yet.

    template<int capacity, int gap, typename T> class nonblockigcircular {
  /*
   * capacity - size of cicular buffer
   * gap - minimum safety distance between head and tail to start insertion operation
   *       generally gap should exceed number of threads attempting insertion concurrently 
   *       capacity should be gap plus desired buffer size 
   * T   - is a data type for buffer to keep
   */
  volatile T buf[capacity];  // buffer

  std::atomic<int> t, h, ph, wh; 
  /* t to h data available for reading
   * h to ph - zone where data is likely written but h is not updated yet
   *   to make sure data is written check if ph==wh 
   * ph to wh - zone where data changes in progress 
   */

  bool pop(T &pwk) {
    int td, tnd;

    do {
      int hd=h.load()%capacity;
      td=t.load()%capacity;
      if(hd==td) return false;
      tnd=(td+1)%capacity;
    } while(!t.compare_exchange_weak(td, tnd));

    pwk=buf[td];
    return true;
  }


  const int  count() {
    return ( h.load()+capacity-t.load() ) % capacity;
    }

  bool push(const T &pwk) {
    const int tt=t.load();
    int hd=h.load();

    if(  capacity - (hd+capacity-tt) % capacity < gap) {
       // Buffer is too full to insert
       // return false; 
       // or delete last record as below
       int nt=t.fetch_add(1);
       if(nt==capacity-1) t.fetch_sub(capacity);
       }


    int nwh=wh.fetch_add(1);
    if(nwh==capacity-1) wh.fetch_sub(capacity);

    buf[nwh%capacity]=pwk;

    int nph=ph.fetch_add(1);
    if(nph==capacity-1) ph.fetch_sub(capacity);

    if(nwh==nph) {
      int ohd=hd;
      while(! h.compare_exchange_weak(hd, nwh) ) {
        hd=h.load();
        if( (ohd+capacity-hd) % capacity > (ohd+capacity-nwh) % capacity ) break;
      }
    }
    return true;
  }

};
Cavetto answered 17/6, 2016 at 14:31 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.