Shared-memory IPC synchronization (lock-free)
Asked Answered
E

1

15

Consider the following scenario:

Requirements:

  • Intel x64 Server (multiple CPU-sockets => NUMA)
  • Ubuntu 12, GCC 4.6
  • Two processes sharing large amounts of data over (named) shared-memory
  • Classical producer-consumer scenario
  • Memory is arranged in a circular buffer (with M elements)

Program sequence (pseudo code):

Process A (Producer):

int bufferPos = 0;
while( true )
{
    if( isBufferEmpty( bufferPos ) )
    {
        writeData( bufferPos );
        setBufferFull( bufferPos );

        bufferPos = ( bufferPos + 1 ) % M;
    }
}

Process B (Consumer):

int bufferPos = 0;
while( true )
{
    if( isBufferFull( bufferPos ) )
    {
        readData( bufferPos );
        setBufferEmpty( bufferPos );

        bufferPos = ( bufferPos + 1 ) % M;
    }
}

Now the age-old question: How to synchronize them effectively!?

  1. Protect every read/write access with mutexes
  2. Introduce a "grace period", to allow writes to complete: Read data in buffer N, when buffer(N+3) has been marked as full (dangerous, but seems to work...)
  3. ?!?

Ideally I would like something along the lines of a memory-barrier, that guarantees that all previous reads/writes are visible across all CPUs, along the lines of:

writeData( i );
MemoryBarrier();

//All data written and visible, set flag
setBufferFull( i );

This way, I would only have to monitor the buffer flags and then could read the large data chunks safely.

Generally I'm looking for something along the lines of acquire/release fences as described by Preshing here:

http://preshing.com/20130922/acquire-and-release-fences/

(if I understand it correctly the C++11 atomics only work for threads of a single process and not along multiple processes.)

However the GCC-own memory barriers (__sync_synchronize in combination with the compiler barrier asm volatile( "" ::: "memory" ) to be sure) don't seem to work as expected, as writes become visible after the barrier, when I expected them to be completed.

Any help would be appreciated...

BTW: Under windows this just works fine using volatile variables (a Microsoft specific behaviour)...

Emanation answered 5/3, 2014 at 19:39 Comment(0)
H
35

Boost Interprocess has support for Shared Memory.

Boost Lockfree has a Single-Producer Single-Consumer queue type (spsc_queue). This is basically what you refer to as a circular buffer.

Here's a demonstration that passes IPC messages (in this case, of type string) using this queue, in a lock-free fashion.

Defining the types

First, let's define our types:

namespace bip = boost::interprocess;
namespace shm
{
    template <typename T>
        using alloc = bip::allocator<T, bip::managed_shared_memory::segment_manager>;

    using char_alloc    =  alloc<char>;
    using shared_string =  bip::basic_string<char, std::char_traits<char>, char_alloc >;
    using string_alloc  =  alloc<shared_string>;

    using ring_buffer = boost::lockfree::spsc_queue<
        shared_string, 
        boost::lockfree::capacity<200> 
        // alternatively, pass
        // boost::lockfree::allocator<string_alloc>
    >;
}

For simplicity I chose to demo the runtime-size spsc_queue implementation, randomly requesting a capacity of 200 elements.

The shared_string typedef defines a string that will transparently allocate from the shared memory segment, so they are also "magically" shared with the other process.

The consumer side

This is the simplest, so:

int main()
{
    // create segment and corresponding allocator
    bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536);
    shm::string_alloc char_alloc(segment.get_segment_manager());

    shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")();

This opens the shared memory area, locates the shared queue if it exists. NOTE This should be synchronized in real life.

Now for the actual demonstration:

while (true)
{
    std::this_thread::sleep_for(std::chrono::milliseconds(10));

    shm::shared_string v(char_alloc);
    if (queue->pop(v))
        std::cout << "Processed: '" << v << "'\n";
}

The consumer just infinitely monitors the queue for pending jobs and processes one each ~10ms.

The Producer side

The producer side is very similar:

int main()
{
    bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536);
    shm::char_alloc char_alloc(segment.get_segment_manager());

    shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")();

Again, add proper synchronization to the initialization phase. Also, you would probably make the producer in charge of freeing the shared memory segment in due time. In this demonstration, I just "let it hang". This is nice for testing, see below.

So, what does the producer do?

    for (const char* s : { "hello world", "the answer is 42", "where is your towel" })
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(250));
        queue->push({s, char_alloc});
    }
}

Right, the producer produces precisely 3 messages in ~750ms and then exits.

Note that consequently if we do (assume a POSIX shell with job control):

./producer& ./producer& ./producer&
wait

./consumer&

Will print 3x3 messages "immediately", while leaving the consumer running. Doing

./producer& ./producer& ./producer&

again after this, will show the messages "trickle in" in realtime (in burst of 3 at ~250ms intervals) because the consumer is still running in the background

See the full code online in this gist: https://gist.github.com/sehe/9376856

Hembree answered 5/3, 2014 at 21:22 Comment(9)
Just noticed gcc 4.6 mentioned. It's possible that this version doesn't have uniform inializers. Just explicitly call the constructor for shared_string then when pushing onto the queue.Hembree
Thank you for your detailed answer. It is always fascinating to see how even complex problems come down to 100 lines of code in boost ;-) However my question still remains. Is there a construct of some kind that enforces memory visibility (something similar to smp_mb(), that is only available in the kernel)). Or in other words, how do mutexes enforce memory visibility?Emanation
@Emanation It's 54 lines of code (not counting makefile). I've just pushed a c++03 update to the gist. The Lockfree library uses atomics in the underlying implementation, so I trust it to employ the right barriers. I would be surprised if the fact that memory pages are shared has any impact on the visibility semantics. Here's some notes about Interprocess support in Boost Lockfree documentationHembree
If you're looking for a shared mutex (named mutex on Win32), see Mutex or Synchronization mechanisms overview in the Boost Interprocess docs.Hembree
Ok, thank you again for your answer. I did take a look into the boost interprocess and atomics sources and only encountered "familiar" barrier implementations. So I still don't understand why the memory barrier (__sync_synchronize) does not work as expected in my implementation. However, as this goes beyond my original question, I will mark your (excellent) answer as accepted :-)Emanation
Hi @sehe. I have doubt about the deallocation. When does it happen in this case? It seems you are allocating a shared_string object each time you push into the queue, but the pop, does't deallocate it? Is it called when the local object goes out of scope? These allocations and deallocations happening in shared memory are costly operations (on each push operation), no?Scantling
@Scantling why do you think it doesn't deallocate? It does, obviously, on destruction of the shm::shared_string, using its allocator. Exactly like any std::string would.Hembree
@Hembree When does it destruct? Doesn't the destruction needs to happen after consumer has consumed from the queue?Scantling
@Scantling Yes. That's why the consumer pops into their local copy: queue->pop(v). You can also assing it to a non-shared string instance, but that's not what the spsc_queue::pop interface was designed for (it was designed to allow lock-free operation). You can manually do the conversion after, or consider the consume_one(f) operation insteadHembree

© 2022 - 2024 — McMap. All rights reserved.