Does an optimistic lock-free FIFO queue implementation exist?
Asked Answered
H

5

10

Is there any C++ implementation (source codes) of "optmistic approach to lock-free FIFO queues" algorithm?

Hypnotic answered 31/5, 2010 at 18:26 Comment(0)
A
11

Herb Sutter covered just such a queue as part of his Effective Concurency column in Dr. Dobbs Journal.

Writing Lock-Free Code: A Corrected Queue

Airel answered 31/5, 2010 at 18:58 Comment(9)
that is theory, what i'am asking is the implementation. is there any source codes or library that implement those algorithm?Hypnotic
Did you even read the article? All of page 2 is annotated source code.Airel
ok, sorry about that, i'am expecting it is wrapped as library or something... so I just include the source to my project and use it. is there any benchmark of this algorithm compared to the paper I refer above?Hypnotic
Compared to, no. But there are benchmarks in his follow-up articles, linked from his website: EC #16 through EC #18.Airel
do you have those implementation of atomic<T> ?Hypnotic
No, that's something that's in C++0x. You can find libraries that have a similar type, or you can roll your own. Setting an atomic<> is a simple matter of a compare-and-swap operation, which is often provided as a compiler intrinsic.Airel
This question has some discussion on writing atomic<> and similar functions: #1158874Airel
thx, I ended up writing my own atomic<>, I have one more question about CACHE_LINE_SIZE on my new post below..Hypnotic
Unfortunately Page 2 cannot be accessed due to some problem on drdoobs web page.Crescantia
H
4

I want to conclude the answer given by greyfade, which is based on http://www.drdobbs.com/high-performance-computing/212201163 (the last part of the article), the optimized code would be (with some modification to suit my naming and coding convention) : `

template <typename T> class LFQueue {
private:
    struct LFQNode {
        LFQNode( T* val ) : value(val), next(nullptr) { }
        T* value;
        AtomicPtr<LFQNode> next;
        char pad[CACHE_LINE_SIZE - sizeof(T*) - sizeof(AtomicPtr<LFQNode>)];
    };

    char pad0[CACHE_LINE_SIZE];
    LFQNode* first;                 // for one consumer at a time
    char pad1[CACHE_LINE_SIZE - sizeof(LFQNode*)];
    InterlockedFlag consumerLock;   // shared among consumers
    char pad2[CACHE_LINE_SIZE - sizeof(InterlockedFlag)];
    LFQNode* last;                  // for one producer at a time
    char pad3[CACHE_LINE_SIZE - sizeof(LFQNode*)];
    InterlockedFlag producerLock;   // shared among producers
    char pad4[CACHE_LINE_SIZE - sizeof(InterlockedFlag)];
public:
    LFQueue() {
        first = last = new LFQNode( nullptr ); // no more divider
        producerLock = consumerLock = false;
    }

    ~LFQueue() {
        while( first != nullptr ) {
            LFQNode* tmp = first;
            first = tmp->next;
            delete tmp;
        }
    }

    bool pop( T& result ) {
        while( consumerLock.set(true) ) 
        { }                             // acquire exclusivity
        if( first->next != nullptr ) {  // if queue is nonempty 
            LFQNode* oldFirst = first;
            first = first->next;
            T* value = first->value;    // take it out
            first->value = nullptr;     // of the Node
            consumerLock = false;       // release exclusivity
            result = *value;            // now copy it back
            delete value;               // and clean up
            delete oldFirst;            // both allocations
            return true;                // and report success
        }
        consumerLock = false;           // release exclusivity
        return false;                   // queue was empty
    }

    bool push( const T& t )  {
        LFQNode* tmp = new LFQNode( t );    // do work off to the side
        while( producerLock.set(true) ) 
        { }                             // acquire exclusivity
        last->next = tmp;               // A: publish the new item
        last = tmp;                     // B: not "last->next"
        producerLock = false;           // release exclusivity
        return true;
    }
};

`

another question is how do you define CACHE_LINE_SIZE? its vary on ever CPUs right?

Hypnotic answered 1/6, 2010 at 0:13 Comment(6)
A good number to choose would be 64 bytes, I think. But you'll probably want to balance it with size, so I'd suggest looking at your target CPUs and choose a size that fits the most common ones you expect to target.Airel
Just a quick note: this is not a forum, so people can't be assume to "browse the thread". If you wish to ask another question, you should better use the "Ask Question" field rather than the "Your Answer" one.Grecize
I'am indeed re-answering the question, but i was wrong asking in the answer field, I should add new comment under my own new answer. sorry about that.Hypnotic
I'am done benchmarking the above code against std::queue with CRITICAL_SECTION lock in windows, the lock-free queue is actually 2~3 times slower than implementation of std::queue with lock. do you know why? it is because of linked list?Hypnotic
sharing benchmark code including what system you're running would be useful here. Also what is the intended usage in your app, that's what matters.Moule
Ouch. The cache line alignment hack is ugly. What happens when your code runs on a CPU with a different cache arrangement? also, that struct is consuming a LOT of cache. L1 cache is a scarce resource. I can understand it being done, otherwise you physical bond entities which are logically separate, but still - ouch. Expensive.Subsidize
L
1

Here is my implementation of a lock-free FIFO.

Make sure each item of T is a multiple of 64 bytes (the cache line size in the Intel CPUs) to avoid false sharing.

This code compiles with gcc/mingw and should compile with clang. It's optimized for 64-bit, so to get it to run on 32-bit would need some refactoring.

https://github.com/vovoid/vsxu/blob/master/engine/include/vsx_fifo.h

vsx_fifo<my_struct, 512> my_fifo;

Sender:

my_struct my_struct_inst;
... fill it out ...
while (!my_fifo.produce(my_struct_inst)) {}

Receiver:

my_struct my_struct_recv;
while(my_fifo.consume(my_struct_recv)) 
{ 
  ...do stuff...
}
Lipinski answered 13/10, 2014 at 21:42 Comment(0)
G
1

How about this lfqueue

This is cross-platform, unlimited enqueue thread safety queue, have been tested multi deq, multi enq-deq and multi enq. Guarantee memory safe.

For example

int* int_data;
lfqueue_t my_queue;

if (lfqueue_init(&my_queue) == -1)
    return -1;

/** Wrap This scope in other threads **/
int_data = (int*) malloc(sizeof(int));
assert(int_data != NULL);
*int_data = i++;
/*Enqueue*/
 while (lfqueue_enq(&my_queue, int_data) == -1) {
    printf("ENQ Full ?\n");
}

/** Wrap This scope in other threads **/
/*Dequeue*/
while  ( (int_data = lfqueue_deq(&my_queue)) == NULL) {
    printf("DEQ EMPTY ..\n");
}

// printf("%d\n", *(int*) int_data );
free(int_data);
/** End **/

lfqueue_destroy(&my_queue);
Gourmand answered 10/9, 2018 at 1:8 Comment(0)
M
0

If you're looking for a good lock free queue implementation both Microsoft Visual Studio 2010 & Intel's Thread Building Blocks contain a good LF queue which is similar to the paper.

Here's a link to the one in VC 2010

Moule answered 31/5, 2010 at 22:46 Comment(1)
I try the vs2010 one and benchmarked, it is faster than "std::queue with lock" on small data sets, but exponentialy slower on large datasetHypnotic

© 2022 - 2024 — McMap. All rights reserved.