When should System.Threading.Channels be preferred to ConcurrentQueue?
C

1

14

I recently built a consumer/producer system using ConcurrentQueue<T> and SemaphoreSlim. Then made another alternative system utilizing the new System.Threading.Channel class.

After benchmarking both systems using BenchmarkDotNet by writing 1000 items 1000 times into both systems (and waiting for reader to finish), I get the following results:

|      Method | ItemsCount | Iterations |        Mean |       Error |      StdDev |      Median |  Allocated |
|------------ |----------- |----------- |------------:|------------:|------------:|------------:|-----------:|
|     MyQueue |       1000 |       1000 | 19,379.4 us | 1,230.30 us | 3,569.33 us | 18,735.6 us | 8235.02 KB |
|   MyChannel |       1000 |       1000 | 45,858.2 us | 1,298.42 us | 3,704.46 us | 45,689.2 us |   72.11 KB |

The ConcurrentQueue implementation seems to be significantly faster than the Channel.

I tried setting SingleReader and SingleWriter to true on the Channel, but results ended up being worse:

|      Method | ItemsCount | Iterations |        Mean |       Error |      StdDev |      Median |  Allocated |
|------------ |----------- |----------- |------------:|------------:|------------:|------------:|-----------:|
|     MyQueue |       1000 |       1000 | 18,578.7 us | 1,238.46 us | 3,493.10 us | 18,192.7 us | 8236.31 KB |
|   MyChannel |       1000 |       1000 | 50,506.9 us | 1,383.73 us | 3,857.28 us | 49,635.8 us |  170.73 KB |

I'm not sure if there is a flaw in my implementation or the benchmark itself? If not and these results are valid, when should Channels be preferred over just plain ConcurrentQueue?

The simplified code of both classes look like this:

public class MyQueue
{
    ConcurrentQueue<Item> _queue;
    SemaphoreSlim _readerFinishedSemaphore;
    SemaphoreSlim _readSemaphore;

    bool completed = false;

    public void Setup()
    {
        _queue = new();
        _readerFinishedSemaphore = new(0);
        _readSemaphore = new(0);

        var task = new Task(Reader, TaskCreationOptions.LongRunning);
        task.Start();
    }

    private async void Reader()
    {
        while (true)
        {
            await _readSemaphore.WaitAsync();
            while (_queue.TryDequeue(out var item))
            {
                // do stuff ...
            }

            if (_completed) break;
        }

        _readerFinishedSemaphore.Release();
    }

    public void Write(IList<Item> items)
    {
        foreach (var i in items)
        {
            _queue.Enqueue(i);
        }

        _readSemaphore.Release();
    }

    public void CompleteAndWaitForReader()
    {
        _completed = true;
        _readSemaphore.Release();
        _readerFinishedSemaphore.Wait();
    }
}

And for Channels:

public class MyChannel
{
    Channel<Item> _channel = null!;
    SemaphoreSlim _readerFinishedSemaphore = null!;

    public void Setup()
    {
        _readerFinishedSemaphore = new(0);
        _channel = Channel.CreateUnbounded<Item>();

        var task = new Task(Reader, TaskCreationOptions.LongRunning);
        task.Start();
    }

    private async void Reader()
    {
        var reader = _channel.Reader;

        while (await reader.WaitToReadAsync())
        {
            while (reader.TryRead(out var item))
            {
                // do stuff ...
            }
        }

        _readerFinishedSemaphore.Release();
    }

    public void Write(IList<Item> items)
    {
        foreach (var i in items)
        {
            _channel.Writer.TryWrite(i);
        }
    }

    public void CompleteAndWaitForReader()
    {
        _channel.Writer.Complete();
        _readerFinishedSemaphore.Wait();
    }
}

The benchmarking code looks like this:

// items are generated in [GlobalSetup] using fixed-seed Random class

[IterationSetup]
public void IterationSetup()
{
    myChannel = new MyChannel();
    myQueue = new MyQueue();

    myChannel.Setup();
    myQueue.Setup();
}

[Benchmark]
public void MyQueue()
{
    for (int i = 0; i < Iterations; i++)
        myQueue.Write(items);

    myQueue.CompleteAndWaitForReader();
}

// same for MyChannel

Should be noted I am running this on .NET 8.0.0-preview.6.23329.4

Candlemaker answered 1/8, 2023 at 8:36 Comment(14)
Is your benchmark representative of your actual scenario? In particular, does your actual scenario involve a Writer that writes a single batch of items and then completes? In case it does, I would suggest an even faster alternative: use a ConcurrentQueue<Item[]> or Channel<Item[]> and pass the single batch as an array. Or even simpler, make the Writer a Task<Item[]> method, and the Reader will just await the Writer() to get the single batch.Indetermination
Note that a Channel uses a ConcurrentQueue as part of its implementation so it can really only ever be slower. So the answer will be: Use a Channel if it gives you some functionality you need that ConcurrentQueue alone does not.Bayern
@MatthewWatson the question is why the Channel<T> is slower than the combination ConcurrentQueue<T>+SemaphoreSlim, not than a ConcurrentQueue<T> alone. It's not obvious why it should be. Actually my expectation is that the Channel<T> should be faster, at least in a real-life scenario (the benchmark in the question is not).Indetermination
@TheodorZoulias Well it looks to me like the OP's ConcurrentQueue implementation is signaling the semaphore only once for every 1,000 writes while the Channel implementation will be doing whatever synchronization it does for every write (when it calls TryWrite()).Bayern
Look at the implementation of [UnboundedChannel.TryWrite() and consider that this is being called for every write.Bayern
You are probably right. The difference probably comes from signalling once per batch vs. every item. I will be adjusting my benchmark later today and trying again.Candlemaker
Also the writes to the ConcurrentQueue will be lock-free whereas the writes to the channel will always incur at least one lock.Bayern
Looks like you attempted to write an async producer/consumer wrapper around a synchronous producer/consumer queue, but the implementation is incorrect in a couple places. If you're comparing async producer/consumer queues, first make the implementation correct and then compare for speed.Irrefragable
@MatthewWatson yes, the OP's implementation simulates an unrealistic scenario. So the result of the benchmark is irrelevant. In a real-life scenario their code would be initially buggy, and after fixing the bugs it would be complex, unmaintainable, and most likely slower than the Channel<T>.Indetermination
@MatthewWatson you are right that writing to a Channel<T> incurs a lock, but the same is true for the SemaphoreSlim's WaitAsync and Release methods (source code).Indetermination
@TheodorZoulias Indeed - but the semaphore lock is only being taken once every 1,000 elements for the OP's ConcurrentQueue implementation. Although you posted a link to the old implementation - the new SemaphoreSlim implementation doesn't necessarily lock.Bayern
@MatthewWatson yep, and that's why it's buggy and it won't work in a real-life scenario.Indetermination
(Actually to be clear the SemaphoreSlim only avoids using a lock if the timeout is 0, but in this case it's Timeout.Infinite)Bayern
@MatthewWatson I think that the link I posted points to the main branch in the dotnet/runtime repository. It's effectively the upcoming .NET 8 implementation. AFAIK the SemaphoreSlim hasn't seen any significant implementation changes during the last 6-7 years.Indetermination
C
24

The main reason why ConcurrentQueue<T> was performing faster was because it was signalling only once per 1000 added items, while the Channel<T> was doing it for every item.

When I adjusted the benchmark to instead add 1000 items separately one by one to make it fairer, the results were practically identical:

|    Method | ItemsCount |     Mean |    Error |   StdDev |   Median | Allocated |
|---------- |----------- |---------:|---------:|---------:|---------:|----------:|
|   MyQueue |       1000 | 163.8 us | 22.09 us | 64.44 us | 144.8 us |   8.42 KB |
| MyChannel |       1000 | 163.2 us | 14.02 us | 41.12 us | 177.9 us |   5.48 KB |

And at higher item counts, the difference was becoming more obvious in favor of the Channel<T> implementation: (also notably in terms of allocations)

|    Method | ItemsCount |      Mean |     Error |    StdDev |    Median | Allocated |
|---------- |----------- |----------:|----------:|----------:|----------:|----------:|
|   MyQueue |      10000 |  1.668 ms | 0.1971 ms | 0.5811 ms |  1.841 ms |  16.67 KB |
| MyChannel |      10000 |  1.163 ms | 0.1090 ms | 0.3197 ms |  1.121 ms |   9.92 KB |
|   MyQueue |     100000 | 10.906 ms | 1.1151 ms | 3.1995 ms | 11.850 ms |  65.17 KB |
| MyChannel |     100000 |  6.678 ms | 0.2506 ms | 0.7026 ms |  6.653 ms |   9.92 KB |

So I guess I will be sticking to Channel<T> for general producer/consumer scenarios.

Candlemaker answered 1/8, 2023 at 19:4 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.