So, I've written a queue, after a bit of research. It uses a fixed-size buffer, so it's a circular queue. It has to be thread-safe, and I've tried to make it lock-free. I'd like to know what's wrong with it, because these kinds of things are difficult to predict on my own.
Here's the header:
template <class T>
class LockFreeQueue
{
public:
LockFreeQueue(uint buffersize) : buffer(NULL), ifront1(0), ifront2(0), iback1(0), iback2(0), size(buffersize) { buffer = new atomic <T>[buffersize]; }
~LockFreeQueue(void) { if (buffer) delete[] buffer; }
bool pop(T* output);
bool push(T input);
private:
uint incr(const uint val)
{return (val + 1) % size;}
atomic <T>* buffer;
atomic <uint> ifront1, ifront2, iback1, iback2;
uint size;
};
And here's the implementation:
template <class T>
bool LockFreeQueue<T>::pop(T* output)
{
while (true)
{
/* Fetch ifront and store it in i. */
uint i = ifront1;
/* If ifront == iback, the queue is empty. */
if (i == iback2)
return false;
/* If i still equals ifront, increment ifront, */
/* Incrememnting ifront1 notifies pop() that it can read the next element. */
if (ifront1.compare_exchange_weak(i, incr(i)))
{
/* then fetch the output. */
*output = buffer[i];
/* Incrememnting ifront2 notifies push() that it's safe to write. */
++ifront2;
return true;
}
/* If i no longer equals ifront, we loop around and try again. */
}
}
template <class T>
bool LockFreeQueue<T>::push(T input)
{
while (true)
{
/* Fetch iback and store it in i. */
uint i = iback1;
/* If ifront == (iback +1), the queue is full. */
if (ifront2 == incr(i))
return false;
/* If i still equals iback, increment iback, */
/* Incrememnting iback1 notifies push() that it can write a new element. */
if (iback1.compare_exchange_weak(i, incr(i)))
{
/* then store the input. */
buffer[i] = input;
/* Incrementing iback2 notifies pop() that it's safe to read. */
++iback2;
return true;
}
/* If i no longer equals iback, we loop around and try again. */
}
}
EDIT: I made some major modifications to the code, based on comments (Thanks KillianDS and n.m.!). Most importantly, ifront and iback are now ifront1, ifront2, iback1, and iback2. push() will now increment iback1, notifying other pushing threads that they can safely write to the next element (as long as it's not full), write the element, then increment iback2. iback2 is all that gets checked by pop(). pop() does the same thing, but with the ifrontn indices.
Now, once again, I fall into the trap of "this SHOULD work...", but I don't know anything about formal proofs or anything like that. At least this time, I can't think of a potential way that it could fail. Any advice is appreciated, except for "stop trying to write lock-free code".
if (ifront == iback) return true; else return false;
inisEmpty()
should just bereturn ifront == iback;
same inisFull()
. Avoid avoid extra if-else. – Torrencecompare_and_exchange
inpush
both notifies writers that the element may not be written to any more and readers that the element can now be read from. There's no guarantee the atomic write tobuffer[i]
occurs before the read in the first scenario. – Gehennaincr
can be simplified toreturn (val+1)%size
. It does not change anything, but less code is easier to grasp in concurrency. – Polysemy