How to detect AddingCompleted of a BlockingCollection without race condition and exception?
Asked Answered
P

0

6

I'm using a BlockingCollection{T} that's filled from only one thread and consumed by only one thread. Producing and consuming items works fine. The problem is at the end of this operation. The task blocks (as expected) at GetConsumingEnumerable. After calling CompleteAdding the task will dispose the BlockingCollection and will finish without any exceptions. So far so good.

Now I've a thread that adds items to the BlockingCollection. This thread has to test for IsAddingCompleted and then it has to add the item. But there's a race condition between aksing for IsAddingCompleted and adding the item. There's a TryAdd-method but is also raises an exception if adding is already completed.

How can I add an item or test for adding completed without an additional lock? Why does TryAdd throw any exceptions? Returning false will be fine if adding is already completed.

The very simplified code looks like that:

private BlockingCollection<string> _items = new BlockingCollection<string>();

public void Start()
{
    Task.Factory.StartNew(
        () =>
            {
                foreach (var item in this._items.GetConsumingEnumerable())
                {
                }

                this._items.Dispose();
            });

    Thread.Sleep(50); // Wait for Task

    this._items.CompleteAdding(); // Complete adding
}

public void ConsumeItem(string item)
{
    if (!this._items.IsAddingCompleted)
    {
        this._items.Add(item);
    }
}

Yes I know that this code doesn't make sense because there's nearly no chance to add any item and the foreach-loop does noting. The consuming task doesn't matter for my problem.

The problem is shown in ConsumeItem-method. I'm able to add an additional lock (Semaphore) arround ConsumeItem and CompleteAdding+Dispose but I try to avoid this performance impact.

How can I add items without any exceptions? Losing items will be fine if adding has been completed.

Paramaribo answered 1/7, 2014 at 6:44 Comment(3)
I guess that you want to avoid the cost of catching the exception thrown by the TryAdd. You can look for a possible solution here. The idea is to reimplement the BlockingCollection<T> based on a BufferBlock<T>. It's easier than it sounds.Raychel
@TheodorZoulias Oh wow - I asked that more than 6 years ago. During that time I gave up waiting for a solution. But I switched to System.Threading.Channels.Paramaribo
This is a wise choice. Myself I dug into the oldest BlockingCollection<T>-related questions out of boredom. Some are still quite interesting. :-)Raychel

© 2022 - 2024 — McMap. All rights reserved.