The collection that seems more suitable for your case is the Channel<T>
. This is an asynchronous version of the BlockingCollection<T>
, and internally it's based on the same storage (the ConcurrentQueue<T>
collection)¹. The similarities are:
- They both can be configured to be bounded or unbounded.
- A consumer can take a message, even if none is currently available. In this case the
Take
/ReadAsync
call will block either synchronously or asynchronously until a message can be consumed, or the collection completes, whatever comes first.
- A producer can push a message, even if the collection is currently full. In this case the
Add
/WriteAsync
call will block either synchronously or asynchronously until there is space available for the message, or the collection completes, whatever comes first.
- A consumer can enumerate the collection in a consuming fashion, with a
foreach
/await foreach
loop. Each message received in the loop is consumed by this loop, and will never be available to other consuming loops that might be active by other consumers in parallel.
Some features of the Channel<T>
that the BlockingCollection<T>
lacks:
- A
Channel<T>
exposes two facades, a Writer
and a Reader
, that allow a better separation between the roles of the producer and the consumer. In practice this can be more of an annoyance than a useful feature IMHO, but nonetheless it's part of the experience of working with a channel.
- A
ChannelWriter<T>
can be optionally completed with an error. This error is propagated to the consumers of the channel.
- A
ChannelReader<T>
has a Completion
property of type Task
.
- A bounded
Channel<T>
can be configured to be lossy, so that it drops old buffered messages automatically in order to make space for new incoming messages.
Some features of the BlockingCollection<T>
that the Channel<T>
lacks:
- There is no direct support for timeout when writing/reading messages. This can be achieved indirectly (but precariously, see below) with timer-based
CancellationTokenSource
s.
- The contents of a channel cannot be enumerated in a non-consuming fashion.
- Some auxiliary features like the
BlockingCollection<T>.TakeFromAny
method are not available.
- A channel cannot be backed by other internal collections, other than the
ConcurrentQueue<T>
. So it can't have, for example, the behavior of a stack instead of a queue.
Other differences:
- A
BlockingCollection<T>
is created using the constructor of the class. On the contrary a Channel<T>
is created using the static
factory methods of the Channel
class.
- The
BlockingCollection<T>
is not sealed, but it is practically not extendable (only the Dispose
method is virtual
and protected
). It is possible though to inject custom functionality by passing in the constructor a custom IProducerConsumerCollection<T>
implementation. On the contrary the Channel<T>
is an abstract class, upon which derived classes can be built (if you are brave enough, and ready to handle the complexity of ValueTask
s).
Caveat:
There is a nasty memory leak issue that is triggered when a channel is idle (empty with an idle producer, or full with an idle consumer), and at the other end a hyper-active consumer or producer attempts continuously to read/write messages with timer-based CancellationTokenSource
s. Each such canceled operation leaks about 800 bytes. The leak is resolved automatically when the first read/write operation completes successfully. This issue is known for more than two years, and Microsoft has not decided yet what to do with it.
¹ Actually the only channels that are backed by a ConcurrentQueue<T>
are unbounded channels configured with SingleReader = false
(the default). Currently there are three built-in implementations, two for unbounded channels (UnboundedChannel<T>
, SingleConsumerUnboundedChannel<T>
) and one for bounded channels (BoundedChannel<T>
).