Is there anything like asynchronous BlockingCollection<T>?
Asked Answered
U

4

109

I would like to await on the result of BlockingCollection<T>.Take() asynchronously, so I do not block the thread. Looking for anything like this:

var item = await blockingCollection.TakeAsync();

I know I could do this:

var item = await Task.Run(() => blockingCollection.Take());

but that kinda kills the whole idea, because another thread (of ThreadPool) gets blocked instead.

Is there any alternative?

Untutored answered 20/1, 2014 at 2:30 Comment(3)
I don't get this, if you use await Task.Run(() => blockingCollection.Take()) the task will be perform on other thread and your UI thread won't blocked.Isn't that the point?Flamenco
@Selman22, this is not a UI app. It is a library exporting Task-based API. It can be used from ASP.NET, for example. The code in question would not scale well there.Untutored
Would it still be a problem if ConfigureAwait was used after the Run()? [ed. never mind, I see what you're saying now]Corinthians
Y
135

There are four alternatives that I know of.

The first is Channels, which provides a threadsafe queue that supports asynchronous Read and Write operations. Channels are highly optimized and optionally support dropping some items if a threshold is reached.

The next is BufferBlock<T> from TPL Dataflow. If you only have a single consumer, you can use OutputAvailableAsync or ReceiveAsync, or just link it to an ActionBlock<T>. For more information, see my blog.

The last two are types that I created, available in my AsyncEx library.

AsyncCollection<T> is the async near-equivalent of BlockingCollection<T>, capable of wrapping a concurrent producer/consumer collection such as ConcurrentQueue<T> or ConcurrentBag<T>. You can use TakeAsync to asynchronously consume items from the collection. For more information, see my blog.

AsyncProducerConsumerQueue<T> is a more portable async-compatible producer/consumer queue. You can use DequeueAsync to asynchronously consume items from the queue. For more information, see my blog.

The last three of these alternatives allow synchronous and asynchronous puts and takes.

Yeaton answered 20/1, 2014 at 3:45 Comment(5)
Git Hub link for when CodePlex finally shuts down: github.com/StephenCleary/AsyncExBlackandblue
The API documentation contains the method AsyncCollection.TryTakeAsync, but I can't find it in the downloaded Nito.AsyncEx.Coordination.dll 5.0.0.0 (latest version). The referenced Nito.AsyncEx.Concurrent.dll does not exist in the package. What am I missing?Camilla
@TheodorZoulias: That method was removed in v5. The v5 API docs are here.Yeaton
Oh, thanks. It looks like it was the easiest and safest way to enumerate the collection. while ((result = await collection.TryTakeAsync()).Success) { }. Why it was removed?Camilla
@TheodorZoulias: Because "Try" means different things to different people. I'm thinking of adding a "Try" method back in but it would actually have different semantics than the original method. Also looking at supporting async streams in a future version, which would definitely be the best method of consumption when supported.Yeaton
D
27

...or you can do this:

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class AsyncQueue<T>
{
    private readonly SemaphoreSlim _sem;
    private readonly ConcurrentQueue<T> _que;

    public AsyncQueue()
    {
        _sem = new SemaphoreSlim(0);
        _que = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        _que.Enqueue(item);
        _sem.Release();
    }

    public void EnqueueRange(IEnumerable<T> source)
    {
        var n = 0;
        foreach (var item in source)
        {
            _que.Enqueue(item);
            n++;
        }
        _sem.Release(n);
    }

    public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
    {
        for (; ; )
        {
            await _sem.WaitAsync(cancellationToken);

            T item;
            if (_que.TryDequeue(out item))
            {
                return item;
            }
        }
    }
}

Simple, fully functional asynchronous FIFO queue.

Note: SemaphoreSlim.WaitAsync was added in .NET 4.5 before that, this was not all that straightforward.

Doyenne answered 24/2, 2018 at 14:15 Comment(9)
What's the use of infinite for? if semaphore is released, queue has at least one item to dequeue, no?Rust
@Rust there might be a race condition if multiple consumers are blocked. We cannot know for sure that there isn't at least two competing consumers and we don't know if both of them manage to wake up before they get to deque an item. In the event of a race, if one doesn't managed to deque, it will go back to sleep and wait for another signal.Doyenne
If two or more consumers make it past WaitAsync(), then there are an equivalent number of items in the queue, and thus they will always dequeue successfully. Am I missing something?Bach
This is a blocking collection, the semantics of TryDequeue are, return with a value, or do not return at all. Technically, if you have more than 1 reader, the same reader can consume two (or more) items before any other reader is fully awake. A successful WaitAsync is just a signal that there may be items in the queue to consume, it's not a guarantee.Doyenne
@JohnLeidegren If the value of the CurrentCount property is zero before this method is called, the method also allows releaseCount threads or tasks blocked by a call to the Wait or WaitAsync method to enter the semaphore. from learn.microsoft.com/en-us/dotnet/api/… How is a successful WaitAsync not have items in queue ? If N release wakes up more than N consumers than semaphore is broken. Isn't it ?Cerumen
@AshishNegi it's just a consequence of the TryDequeue API design. This way, if for whatever reason the queue doesn't doesn't successfully deque, it's behavior is still defined. We could just as well throw an exception. I may have adapted this example for a code that used a monitor were we actually could wake up multiples consumers. I think it's a remnant of that. I can't see why it would be absolutely necessary.Doyenne
@JohnLeidegren is there a reason not to have AsyncQueue implement IProducerConsumerCollection so it can be more flexible?Cutaneous
@Mr.Boy No, other than it complicates things for the purpose of explaining. Have at it.Doyenne
Cheers @JohnLeidegren. Wanted to check I'd not missed something. It seems a nice route for those who don't want to (or can't) use ChannelCutaneous
C
7

The asynchronous (non-blocking) alternative of the BlockingCollection<T> is the Channel<T> class. It offers almost the same functionality, plus some extra features. You can instantiate a Channel<T> using the Channel's static factory methods, as shown below (demonstrating the default values of all available options).

Channel<Item> channel = Channel.CreateUnbounded<Item>(new UnboundedChannelOptions()
{
    SingleWriter = false,
    SingleReader = false,
    AllowSynchronousContinuations = false,
});
Channel<Item> channel = Channel.CreateBounded<Item>(new BoundedChannelOptions(capacity)
{
    SingleWriter = false,
    SingleReader = false,
    AllowSynchronousContinuations = false,
    FullMode = BoundedChannelFullMode.Wait,
});

The most striking difference is that the Channel<T> exposes a Writer and a Reader facade. So you can pass the Writer facade to a method that plays the role of the producer, and similarly the Reader facade to a method that plays the role of the consumer. The Writer is only allowed to add items in the channel, and mark it as completed. The Reader is only allowed to take items from the channel, and await its completion. Both facades expose only non-blocking APIs. For example the ChannelWriter<T> has a WriteAsync method that returns a ValueTask. If you have some reason to block on these APIs, for example if one worker of your producer/consumer pair has to be synchronous, then you can block with .AsTask().GetAwaiter().GetResult(), but this will not be as efficient as using a BlockingCollection<T>. If you want to learn more about the similarities and differences between the Channel<T> and BlockingCollection<T> classes, take a look at this answer.

An implementation of a custom AsyncBlockingCollection<T> class, having only the most basic features, can be found in the 3rd revision of this answer.

Camilla answered 30/4, 2019 at 5:43 Comment(2)
As an afterthought, I now think that the class name AsyncBlockingCollection is nonsensical. Something cannot be asynchronous and blocking at the same time, since these two concepts are the exact opposites!Camilla
But still, it IS an async version of the BlockingCollection :)Cadwell
J
-1

This is super-simple, but it serves my needs.

    public static class BlockingCollectionEx
    {
        public async static Task<T> TakeAsync<T>(this BlockingCollection<T> bc, CancellationToken token, int inner_delay = 10)
        {
            while (!token.IsCancellationRequested)
            {
                if (bc.TryTake(out T el))
                    return el;
                else
                    await Task.Delay(inner_delay);
            }

            throw new OperationCanceledException();
        }
    }
Jayejaylene answered 20/3, 2023 at 16:33 Comment(4)
Instead of while (!token.IsCancellationRequested) and throw new OperationCanceledException();, it is simpler and better to just pass the token to the TryTake method, like in Dejisys's answer: TryTake(out T el, 0, token)Camilla
As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.Leoni
Adding to @TheodorZoulias's comment, I think it can be simplified down to this: while (!bc.TryTake(out T el, inner_delay, token)) ; return el;Glynis
@Glynis the intention of the TakeAsync is to be non-blocking. The TryTake overload with millisecondsTimeout parameter is blocking, so it defeats the purpose.Camilla

© 2022 - 2024 — McMap. All rights reserved.