Creating a blocking Queue<T> in .NET?
Asked Answered
K

10

171

I have a scenario where I have multiple threads adding to a queue and multiple threads reading from the same queue. If the queue reaches a specific size all threads that are filling the queue will be blocked on add until an item is removed from the queue.

The solution below is what I am using right now and my question is: How can this be improved? Is there an object that already enables this behavior in the BCL that I should be using?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}
Kannada answered 9/2, 2009 at 22:2 Comment(4)
.Net how has built-in classes to help with this scenario. Most of the answers listed here are obsolete. See the most recent answers at the bottom. Look into thread-safe blocking collections. The answers may be obsolete, but it's still a good question!Manvel
I think it's still a good idea to learn about Monitor.Wait/Pulse/PulseAll even if we have new concurrent classes in .NET.Isobath
Agree with @thewpfguy. You'll want to comprehend the basic locking mechanisms behind the scenes. Also worth noting that Systems.Collections.Concurrent didn't exist until April 2010 and then only in Visual Studio 2010 and above. Definitively not an option for the VS2008 hold outs...Disordered
If you're reading this now, take a look at System.Threading.Channels for a multi-writer/multi-reader, bounded, optionally-blocking implementation of this for .NET Core and .NET Standard.Beaut
P
207

That looks very unsafe (very little synchronization); how about something like:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(edit)

In reality, you'd want a way to close the queue so that readers start exiting cleanly - perhaps something like a bool flag - if set, an empty queue just returns (rather than blocking):

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}
Parenteau answered 9/2, 2009 at 22:5 Comment(24)
Wouldn't you also need to Monitor.PulseAll in case of close?Kannada
How about changing the wait to a WaitAny and passing in a terminate waithandle on construction ...Opus
@sambo99 - for what purpose? At the moment it uses Monitor, which is the cheapest of all the locking constructs, as it doesn't need to go to the OS. A bool and a Monitor is quite a bit cheaper than a waithandle...Parenteau
Yerp, your implementation here is solid, easy to understand and fast. Only advantage with a WaitAny type approach is that you could have a single event that triggers shutdown without needing a Close method ... It may be useful sometimes.Opus
another thing I would look at here is probably making size queue IDisposable. Also, what happens if your queue object goes out of scope while threads are blocked. It would probably be a bugger to debug.Opus
@Marc- an optimisation, if you were expecting the queue to always reach capacity, would be to pass the maxSize value into the constructor of the Queue<T>. You could add another constructor to your class to accomodate for that.Alps
Why SizeQueue, why not FixedSizeQueue?Vacuity
If you lock on the queue, won't that effectively prevent other threads from messing with it? For instance, add, first it locks the queue, and then if the queue is full, it then waits for the queue to be signalled, but how can it be signalled if no other thread can lock it first? There must be something I'm not getting here...Interstice
@Lasse - it releases the lock(s) during Wait, so other threads can acquire it. It reclaims the lock(s) when it wakes up.Parenteau
Nice, as I said, there was something I wasn't getting :) That sure makes me want to revisit some of my thread-code....Interstice
What happens when a thread wakes up and the lock is already taken by someone else, it will block waiting for the lock to become free and then claim it?Interstice
Actually I can think of many questions around this subject, is there a link to some material I can read? If not, I'll try to summarize up some questions in a post of my own here.Interstice
@Lasse - when it is pulsed it moves from the sleeping-queue to the ready-queue (as though it was just now trying to take the lock). When the thread that pulsed releases the lock, the next thread in the ready-queue (which might be neither of those) gets a go. And so on. MSDN may help, but... Jon's pages have some coverage too.Parenteau
@Lasse - note the drop down at the bottom for the 17 topics: yoda.arachsys.com/csharp/threadsParenteau
Would not Pulse instead of PulseAll work fine in Enqueue/Dequeue?Tetrabasic
@Tetrabasic - possibly, but then you largely have to know which thread is waiting. And in the case of pulling the queue down you actively want to wake up all the worker. In most cases there is only one worker, making it a moot point. In the more complex cases, you've got enough to worry about, without stressing over this - you could carefully add it, I suppose.Parenteau
@Tetrabasic In answer to my own question: no, Pulse is not enough since it wakes up a single thread and we do not know whether this thread is in the Enqueue or Dequeue method.Tetrabasic
In case anyone cares I chose to not have a maximum queue size. Thus the enqueue call never blocks meaning dequeue does not need to call pulse at all and enqueue can use Pulse instead of PulseAll since only dequeue calls are waiting. Note that when closing the queue PulseAll is of course necessary. Also thank you Marc for the quick answer.Tetrabasic
I know this is an old post, but I have a small question. When Enqueue calls PulseAll it releases the monitor to every thread waiting and they race to lock the monitor object again, is it at all possible that if another writer will win and say the queue is full at this point, it'll enter Monitor.Wait again and then we end up with a deadlock?Parton
@Stan that isn't a deadlock; a Wait releases the lock to the next in the list, so at some point a reader will get it, and will reawaken that writer when it has made spaceParenteau
@MarcGravell ahh, I keep forgetting that the .Wait actually releases the lock!! Makes perfect sense now. Thx.Parton
bool TryEnqueue(T item) is also needed to cleanly exit the threads blocked once queue.Count reaches maxSize. The items in queue will not be consumed but can do nothing much about that.Desert
@hlpPy it certainly could be required, if you need a method that appends if there is room, else doesn't append and returns immediately. I didn't need that :)Parenteau
@MarcGravell : Based on your class I had to implement a TryDequeueWhere, to dequeue an object based on certain criteria. Could you have a look at this : https://mcmap.net/q/144979/-dequeue-from-queue-with-where-expression/1959238Fiend
S
64

Use .net 4 BlockingCollection, to enqueue use Add(), to dequeue use Take(). It internally uses non-blocking ConcurrentQueue. More info here Fast and Best Producer/consumer queue technique BlockingCollection vs concurrent Queue

Synclastic answered 9/12, 2011 at 15:22 Comment(0)
P
15

You can use the BlockingCollection and ConcurrentQueue in the System.Collections.Concurrent Namespace

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
    /// <summary>
    /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
    /// </summary>
    public ProducerConsumerQueue()  
        : base(new ConcurrentQueue<T>())
    {
    }

  /// <summary>
  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
  /// </summary>
  /// <param name="maxSize"></param>
    public ProducerConsumerQueue(int maxSize)
        : base(new ConcurrentQueue<T>(), maxSize)
    {
    }



}
Pratte answered 6/2, 2013 at 10:39 Comment(3)
BlockingCollection defaults to Queue. So, I don't think this is necessary.Cellule
Does BlockingCollection preserve ordering like a queue?Silesia
Yes, when it is initialized with a ConcurrentQueuePratte
D
14

"How can this be improved?"

Well, you need to look at every method in your class and consider what would happen if another thread was simultaneously calling that method or any other method. For example, you put a lock in the Remove method, but not in the Add method. What happens if one thread Adds at the same time as another thread Removes? Bad things.

Also consider that a method can return a second object that provides access to the first object's internal data - for example, GetEnumerator. Imagine one thread is going through that enumerator, another thread is modifying the list at the same time. Not good.

A good rule of thumb is to make this simpler to get right by cutting down the number of methods in the class to the absolute minimum.

In particular, don't inherit another container class, because you will expose all of that class's methods, providing a way for the caller to corrupt the internal data, or to see partially complete changes to the data (just as bad, because the data appears corrupted at that moment). Hide all the details and be completely ruthless about how you allow access to them.

I'd strongly advise you to use off-the-shelf solutions - get a book about threading or use 3rd party library. Otherwise, given what you're attempting, you're going to be debugging your code for a long time.

Also, wouldn't it make more sense for Remove to return an item (say, the one that was added first, as it's a queue), rather than the caller choosing a specific item? And when the queue is empty, perhaps Remove should also block.

Update: Marc's answer actually implements all these suggestions! :) But I'll leave this here as it may be helpful to understand why his version is such an improvement.

Dina answered 9/2, 2009 at 22:51 Comment(0)
B
6

I just knocked this up using the Reactive Extensions and remembered this question:

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}

Not necessarily entirely safe, but very simple.

Beaut answered 7/5, 2010 at 14:14 Comment(3)
What is Subject<t>? I don't have any resolver for its namespace.Recuperator
It's part of the Reactive Extensions.Beaut
Not an answer. This doesn't answer the question at all.Paronym
E
5

This is what I came op for a thread safe bounded blocking queue.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}
Engedus answered 28/5, 2009 at 12:55 Comment(2)
Could you provide some code samples of how I'd queue up some thread functions using this library, including how I'd instantiate this class?Recuperator
This question/response is a bit dated. You should look at the System.Collections.Concurrent namespace for blocking queue support.Engedus
C
2

I haven't fully explored the TPL but they might have something that fits your needs, or at the very least, some Reflector fodder to snag some inspiration from.

Hope that helps.

Curfew answered 9/2, 2009 at 22:11 Comment(1)
I am aware that this is old, but my comment is for newcomers to SO, as OP already knows this today. This is not an answer, this should have been a comment.Alguire
V
2

Starting with .NET 5.0/Core 3.0 you can use System.Threading.Channels
Benchmarks from this (Asynchronous Producer Consumer Pattern in .NET (C#)) article show a significant speed boost over BlockingCollection!

Veradia answered 19/11, 2020 at 16:30 Comment(0)
P
0

Well, you might look at System.Threading.Semaphore class. Other than that - no, you have to make this yourself. AFAIK there is no such built-in collection.

Peduncle answered 9/2, 2009 at 22:5 Comment(6)
I looked at that for throttling the number of threads that are accessing a resource but it doesn't allow you to block all access to a resource based on some condition (like Collection.Count). AFAIK anywayKannada
Well, you do that part yoursef, just like you do now. Simply instead of MaxSize and _FullEvent you have the Semaphore, which you initialize with the right count in the constructor. Then, upon every Add/Remove you call WaitForOne() or Release().Peduncle
It's not much different than what you have now. Just more simple IMHO.Peduncle
Can you give me an example showing this working? I didn't see how to adjust the size of a Semaphor dynamically which this scenario requires. Since you have to be able to block all resources only if the queue is full.Kannada
Ahh, changing size! Why didn't you say so immediately? OK, then a semaphore is not for you. Good luck with this approach!Peduncle
I am aware that this is old, but my comment is for newcomers to SO, as OP already knows this today. This is not an answer, this should have been a comment.Alguire
S
-1

If you want maximum throughput, allowing multiple readers to read and only one writer to write, BCL has something called ReaderWriterLockSlim that should help slim down your code...

Salesman answered 9/2, 2009 at 22:4 Comment(4)
I want none to be able to write if the queue is full though.Kannada
So you combine it with a lock. Here are some very good examples albahari.com/threading/part2.aspx#_ProducerConsumerQWaitHandle albahari.com/threading/part4.aspxSalesman
With queue/dequeue, everyone is a writer... an exclusive lock will perhaps be more pragmaticParenteau
I am aware that this is old, but my comment is for newcomers to SO, as OP already knows this today. This is not an answer, this should have been a comment.Alguire

© 2022 - 2024 — McMap. All rights reserved.