I recently built a consumer/producer system using ConcurrentQueue<T>
and SemaphoreSlim
. Then made another alternative system utilizing the new System.Threading.Channel
class.
After benchmarking both systems using BenchmarkDotNet by writing 1000 items 1000 times into both systems (and waiting for reader to finish), I get the following results:
| Method | ItemsCount | Iterations | Mean | Error | StdDev | Median | Allocated |
|------------ |----------- |----------- |------------:|------------:|------------:|------------:|-----------:|
| MyQueue | 1000 | 1000 | 19,379.4 us | 1,230.30 us | 3,569.33 us | 18,735.6 us | 8235.02 KB |
| MyChannel | 1000 | 1000 | 45,858.2 us | 1,298.42 us | 3,704.46 us | 45,689.2 us | 72.11 KB |
The ConcurrentQueue
implementation seems to be significantly faster than the Channel
.
I tried setting SingleReader
and SingleWriter
to true
on the Channel, but results ended up being worse:
| Method | ItemsCount | Iterations | Mean | Error | StdDev | Median | Allocated |
|------------ |----------- |----------- |------------:|------------:|------------:|------------:|-----------:|
| MyQueue | 1000 | 1000 | 18,578.7 us | 1,238.46 us | 3,493.10 us | 18,192.7 us | 8236.31 KB |
| MyChannel | 1000 | 1000 | 50,506.9 us | 1,383.73 us | 3,857.28 us | 49,635.8 us | 170.73 KB |
I'm not sure if there is a flaw in my implementation or the benchmark itself? If not and these results are valid, when should Channels be preferred over just plain ConcurrentQueue?
The simplified code of both classes look like this:
public class MyQueue
{
ConcurrentQueue<Item> _queue;
SemaphoreSlim _readerFinishedSemaphore;
SemaphoreSlim _readSemaphore;
bool completed = false;
public void Setup()
{
_queue = new();
_readerFinishedSemaphore = new(0);
_readSemaphore = new(0);
var task = new Task(Reader, TaskCreationOptions.LongRunning);
task.Start();
}
private async void Reader()
{
while (true)
{
await _readSemaphore.WaitAsync();
while (_queue.TryDequeue(out var item))
{
// do stuff ...
}
if (_completed) break;
}
_readerFinishedSemaphore.Release();
}
public void Write(IList<Item> items)
{
foreach (var i in items)
{
_queue.Enqueue(i);
}
_readSemaphore.Release();
}
public void CompleteAndWaitForReader()
{
_completed = true;
_readSemaphore.Release();
_readerFinishedSemaphore.Wait();
}
}
And for Channels:
public class MyChannel
{
Channel<Item> _channel = null!;
SemaphoreSlim _readerFinishedSemaphore = null!;
public void Setup()
{
_readerFinishedSemaphore = new(0);
_channel = Channel.CreateUnbounded<Item>();
var task = new Task(Reader, TaskCreationOptions.LongRunning);
task.Start();
}
private async void Reader()
{
var reader = _channel.Reader;
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out var item))
{
// do stuff ...
}
}
_readerFinishedSemaphore.Release();
}
public void Write(IList<Item> items)
{
foreach (var i in items)
{
_channel.Writer.TryWrite(i);
}
}
public void CompleteAndWaitForReader()
{
_channel.Writer.Complete();
_readerFinishedSemaphore.Wait();
}
}
The benchmarking code looks like this:
// items are generated in [GlobalSetup] using fixed-seed Random class
[IterationSetup]
public void IterationSetup()
{
myChannel = new MyChannel();
myQueue = new MyQueue();
myChannel.Setup();
myQueue.Setup();
}
[Benchmark]
public void MyQueue()
{
for (int i = 0; i < Iterations; i++)
myQueue.Write(items);
myQueue.CompleteAndWaitForReader();
}
// same for MyChannel
Should be noted I am running this on .NET 8.0.0-preview.6.23329.4
Writer
that writes a single batch of items and then completes? In case it does, I would suggest an even faster alternative: use aConcurrentQueue<Item[]>
orChannel<Item[]>
and pass the single batch as an array. Or even simpler, make theWriter
aTask<Item[]>
method, and theReader
will justawait
theWriter()
to get the single batch. – IndeterminationChannel
uses aConcurrentQueue
as part of its implementation so it can really only ever be slower. So the answer will be: Use aChannel
if it gives you some functionality you need thatConcurrentQueue
alone does not. – BayernChannel<T>
is slower than the combinationConcurrentQueue<T>
+SemaphoreSlim
, not than aConcurrentQueue<T>
alone. It's not obvious why it should be. Actually my expectation is that theChannel<T>
should be faster, at least in a real-life scenario (the benchmark in the question is not). – IndeterminationConcurrentQueue
implementation is signaling the semaphore only once for every 1,000 writes while theChannel
implementation will be doing whatever synchronization it does for every write (when it callsTryWrite()
). – BayernUnboundedChannel.TryWrite()
and consider that this is being called for every write. – BayernConcurrentQueue
will be lock-free whereas the writes to the channel will always incur at least one lock. – BayernChannel<T>
. – IndeterminationChannel<T>
incurs alock
, but the same is true for theSemaphoreSlim
'sWaitAsync
andRelease
methods (source code). – IndeterminationConcurrentQueue
implementation. Although you posted a link to the old implementation - the newSemaphoreSlim
implementation doesn't necessarily lock. – BayernTimeout.Infinite
) – Bayernmain
branch in the dotnet/runtime repository. It's effectively the upcoming .NET 8 implementation. AFAIK theSemaphoreSlim
hasn't seen any significant implementation changes during the last 6-7 years. – Indetermination