Multithreaded single-reader single-writer fifo queue
Asked Answered
C

4

8

I need a queue for passing messages from one thread (A) to another (B), however ive not been able to find one that really does what I want, since they generally allow adding an item to fail, a case which in my situation is pretty much fatal since the message needs to be processed, and the thread really cant stop and wait for spare room.

  • Only thread A adds items, and only thread B reads them
  • Thread A must never block, however thread B is not performance critical, so it can
  • Adding items must always succeed, so the queue cant have an upper size limit (short of running out of memory on the system)
  • If the queue is empty, thread B should wait until there is an item to process
Curbstone answered 19/9, 2009 at 10:15 Comment(4)
What threading library are you using? pthreads?Regardful
boost::thread and some bits of platform specifc code here and thereCurbstone
Your goal could result in running out of memory as you don't allow the writer thread to block or drop items. So if you reach a critical size limit of the queue you have to decide whether to drop items or to block the writer thread. Otherwise you drop items indirectly because your program fails :-)Copartner
Well the queue is most likly to have <100 items in it at a time, and I expect thread B to spend most of its time waiting because the queue is empty. However it is possible that thread B could get blocked by something for a significant time frame (the whole reason thread B is doing the work, not A), allowing the number of items to increase rapidly. If memory actually runs out, aborting is the best bet, since the only way to really continue I suppose is to drop non-essential data, and I suppose thread A's audio processing falls into that category.Curbstone
G
7

Here's how to write a lock-free queue in C++:

http://www.ddj.com/hpc-high-performance-computing/210604448

But when you say "thread A must not block", are you sure that's the requirement? Windows is not a real-time operating system (and neither is linux, in normal use). If you want Thread A to be able to use all available system memory, then it needs to allocate memory (or wait while someone else does). The OS itself cannot provide timing guarantees any better than those you'd have if both reader and writer took an in-process lock (i.e. a non-shared mutex) in order to manipulate the list. And the worst-case of adding a message is going to have to go to the OS to get memory.

In short, there's a reason those queues you don't like have a fixed capacity - it's so that they don't have to allocate memory in the supposedly low-latency thread.

So the lock-free code will generally be less block-y, but due to the memory allocation it isn't guaranteed to be, and performance with a mutex shouldn't be all that shabby unless you have a truly huge stream of events to process (like, you're writing a network driver and the messages are incoming ethernet packets).

So, in pseudo-code, the first thing I'd try would be:

Writer:
    allocate message and fill it in
    acquire lock
        append node to intrusive list
        signal condition variable
    release lock

Reader:
    for(;;)
        acquire lock
            for(;;)
                if there's a node
                    remove it
                    break
                else
                   wait on condition variable
                endif
            endfor
        release lock
        process message
        free message
    endfor

Only if this proves to introduce unacceptable delays in the writer thread would I go to lock-free code, (unless I happened to have a suitable queue already lying around).

Gabriella answered 19/9, 2009 at 10:58 Comment(3)
At a lower level, one could employ a single-linked list with the writing process appending and the reading process consuming. This can be lock-free with the writing process changing NULL pointer to non-NULL and the reading process changing non-NULL to NULL. A small private heap would provide good amortized performance for the list items. The writer mallocs and the reader frees. If the reader goes to sleep, a third process C could be provided that speculatively enlarges the private heap, hiding the blocking nature of allocation from process A.Fatling
your example will deadlock. While the reader is waiting on the cond, it is holding lock, which prevents the writer from acquiring the lock and signaling. You need to release the lock before you wait on condition variable and re-acquire immediately after.Chargeable
@Bobby: you are mistaken. Waiting on a condition variable releases the associated lock during the wait, and then re-acquires it before returning from the wait. This is part of what "condition variable" means -- if the API you're using doesn't do that for you then it's not a condition variable, it's more like a semaphore. And it's important that the API does it, since then your code can rely on the fact that releasing the lock and starting to wait on the condition occurs atomically -- that is to say no other thread can do anything under the lock before your thread is a waiter.Gabriella
B
1

Visual Studio 2010 is adding 2 new libraries which support this scenario very well, the Asynchronous Agents Library and Parallel Pattern Library.

The agents library has support or asynchronous message passing and contains message blocks for sending messages to 'targets' and for receiving messages from 'sources'

An unbounded_buffer is a template class which offers what I believe you are looking for:

#include <agents.h>
#include <ppl.h>
#include <iostream>

using namespace ::Concurrency;
using namespace ::std;

int main()
{
   //to hold our messages, the buffer is unbounded...
   unbounded_buffer<int> buf1;
   task_group tasks;

   //thread 1 sends messages to the unbounded_buffer
   //without blocking
   tasks.run([&buf1](){
      for(int i = 0 ; i < 10000; ++i)
         send(&buf1,i)
     //signal exit 
     send(&buf1,-1);
   });

   //thread 2 receives messages and blocks if there are none

   tasks.run([&buf1](){
      int result;
      while(result = receive(&buf1)!=-1)
      {
           cout << "I got a " << result << endl;
      }
   });

   //wait for the threads to end
   tasks.wait();
}
Benevolent answered 19/9, 2009 at 15:33 Comment(3)
Does that really run under the Linux category?Displacement
FWIW, in your receive loop, you will always output "I got a 1" because the != is evaluated before the =Goins
Is this a correct answer please? is it Fifo?Tarkany
S
1
  • Why not use STL <list> or <deque> with a mutex around add/remove? Is the thread-safety of STL insufficient?

  • Why not create your own (singly/doubly) linked-list-node class that contains a pointer, and have the items to be added/removed inherit from that? Thus making additional allocation unnecessary. You just frob a few pointers in threadA::add() and threadB::remove() and you are done. (While you'd want to do that under a mutex, the blocking effect on threadA would be negligible unless you did something really wrong...)

  • If you're using pthreads, check out sem_post() and sem_wait(). The idea is that threadB can block indefinitely via sem_wait() until threadA puts something on the queue. Then threadA invokes sem_post(). Which wakes up threadB to do it's work. After which threadB can go back to sleep. It's an efficient way of handling asynchronous signaling, supporting things like multiple threadA::add()'s before threadB::remove() completes.

Salic answered 20/9, 2009 at 4:4 Comment(0)
D
0

You might want to consider your requirements - is it truly the case that A can't discard any queue items whatsoever? Or is it that you don't want B to pull two consecutive elements out of the queue that weren't consecutive items going in because that would somehow misrepresent a sequence of events?

For example, if this is some kind of data logging system, you (understandably) wouldn't want gaps in the record -- but without an unlimited memory, the reality is that in some corner case somewhere you probably could overrun your queue capacity..

In which case one solution is to have some kind of special element that can be put in the queue, which represents the case of A discovering that it had to drop items. Basically you keep one extra element around, which is null most of the time. Every time A goes to add elements to the queue, if this extra element is not null, that goes in. If A discovers there is no room in the queue, then it configures this extra element to say 'hey, the queue was full'.

This way, A never blocks, you can drop elements when the system is Very Busy, but you don't lose sight of the fact that elements were dropped, because as soon as queue space becomes available, this mark goes in to indicate where data drop occurred. Process B then does whatever it needs to do when it discovers it has pulled this overrun mark element out of the queue.

Dipietro answered 19/9, 2009 at 11:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.