Using BlockingCollection<T>: OperationCanceledException, is there a better way?
Asked Answered
B

5

30

I'm making use of the (frankly great) BlockingCollection<T> type for a heavily multithreaded, high-performance app.

There's a lot of throughput through the collection and on the micro-level it's highly performant. However, for each 'batch' it will always be ended by flagging the cancellation token. This results in an exception being thrown on any waiting Take call. That's fine, but I would have settled for a return value or output parameter to signal it, because a) exceptions have an obvious overhead and b) when debugging, I don't want to manually turn off break-on-exception for that specific exception.

The implementation seems intense, and in theory I suppose I could disassemble and recreate my own version that didn't use exceptions, but perhaps there's a less complex way?

I could add a null (or if not, a placeholder) object to the collection to signify the process should end, however there also needs to be a means to abort nicely, i.e. wake up waiting threads and tell them somehow that something's gone on.

So - alternative collection types? Recreate my own? Some way to abuse this one?

(Some context: I went with BlockingCollection<T> because it has an advantage over manual locking around a Queue. As best I can tell the use of threading primitives is superb and in my case, a few milliseconds here-and-there and optimal core is use crucial.)

Edit: I've just opened a bounty for this one. I don't believe Anastasiosyal's answer covers the query I raise in my comment of it. I know this is a tough problem. Is anyone able to assist?

Beltane answered 21/1, 2012 at 13:37 Comment(28)
Would the CompleteAdding method work for you?Vegetable
I don't think so, it would still throw an exception on Take and I need to be able to signal an abort from somewhere other than the producer. I may be missing a trick though..Beltane
I wonder what the behavior of the enumerator returned by GetConsumingEnumerable is.. maybe MoveNext blocks by default, and returns false if it's all cancelled! Anyone know about that..?Beltane
Nevermind.. it's an iterator block (yieldy) which still seems to throw an exception, at least internally, possibly furtherBeltane
Use TryTake instead of Take?Vegetable
Tbh, after re-reading your question a few times, I still don't get what you're trying to achieve. Can you add an example or something?Vegetable
the GetConsumingEnumerable returns an enumerable that blocks until a new item is added to the blockingcollection. I don't think it uses exceptions internally, since it would be just as easy to check the Count value internally.Representational
There's a nice example with a BlockingCollection here albahari.com/threading/part5.aspx#_BlockingCollectionTRepresentational
@Vegetable - well, there's a producer of stuff, a consumer of stuff, and a requirement to cancel any waiting operations externally. One thread is using Add, the other Take, and potentially any thread is flagging the CancellationToken. @Representational - check the source, it's using yield and using methods which use the same underlying methods, i.e. exceptions are usedBeltane
@Vegetable - yes, I'm a little confused as well. If the OP wants to add a suicide request, (poison-pill), to the collection, it could be done from any thread.Interception
@Representational - I am using it successfully, the question is quite specific and not about general use; there's nothing in there remotely relevant that I can see?Beltane
Actually I figure the question is fairly clear. The implementation of BlockingCollection uses Exceptions to signal to anyone involved that the operation was cancelled. I'm looking for a way to get exactly the same functionality, without exceptions, for example like a value returned from the methods, or output parameter would supply. [I even gave a reason: throwing an exception takes a few milliseconds, which I'm trying to claw back]Beltane
If work arrives continually in 'batches', why bother with cancelling at all - won't you need the threads again soon for the next batch?Interception
Nope, they're completely separate tasks. A "plan" is put together, started, items are posted around it, then it's stopped, aborted or finalised. There might be many plans per second, but they are completely isolated and use separate collectionsBeltane
@KierenJohnstone Sorry, in that case I don't really understand what you want...Representational
Then could I suggest not trying to answer a question you don't know the answer to? :) I appreciate your attention though!Beltane
@KierenJohnstone, have you considered deriving and then having the descendent class simply issue a Clear on the queue?Gentry
OK, so you have a set of threads per plan/collection/whatever. You can shove on to the queue any command you want from any thread, stop/pause/continue/abort/whatever, so avoiding the need for exceptions. I'm not convinced that such an approach would improve overall performance, however. The consumers would have to process all the plan requests already in the queue before they could execute your command, though a 2-level priority queue, (ie. 2 queues), could alleviate that. Hmm... another issue is being sure that the request you indended to stop/pause is the one currently being run...Interception
I guess the answer is simple: Don't cancel operations if this occurs so frequently that it hurts performance. For example, add a sentinel object to the queue and let the consumers consume the queue up to the object instead of aborting the consumers immediately.Vegetable
Unfortunately, that doesn't work. If the queue is full and the producer is calling Add then there's no way to cancel by attempting to add another item.Beltane
When it comes down to it, there's a design decision chosen by the Framework designers to depend on CancellationToken.ThrowIfCancellationRequested internally on methods that use it. I'm just asking if there's an alternative implementation or way of achieving the same thing without doing that. Rewriting CancellationToken, BlockingCollection<> and SemaphoreSlim is the only thing I've come up with so far..Beltane
You might want to consider redesigning around the TPL which uses work stealing queues, this in turn could provide your desired scalability along with good cancellation support. (take this with a grain of salt, while I've read a bunch on TPL, I don't have concrete experience to share, but the theory sure seems great).Barbecue
Hi Chris, thanks, but each queue is a buffer for a specific type of processing operation, and scalability in fact isn't what this is used for. Work stealing won't help me (but thanks again)Beltane
Don't understand your concern with exceptions. You don't have 1000 exceptions in a loop or something. Canceling will generate one exception per thread so any overhead is insignificant. Reimplementing something that works very well just to avoid pressing F5 in the debugger does not seem productive.Eileen
There might be 10 threads (and so ~20 exceptions) occurring in some peak situations at a rate of 1,000 plans per second (~20,000 exceptions per second). Making debugging a complex system easier is a goal that I think is viable, as is trying to make the bottleneck of this system not be dealing with queuing/dequeuing, as the profiler indicates it currently is.Beltane
Did you solve this in the end?Swiss
@stt106 - no, unfortunately notBeltane
Not entirely sure whether it's relevant but just read that .NET team is aware of this and they in fact throw this exception purposefully. Some have asked them to change this implementation but they refused to do so. connect.microsoft.com/VisualStudio/feedback/details/631951/…Swiss
S
9

As I guess you have already done yourself, looking into the reflected source of BlockingCollection it looks unfortunately that when a CancellationToken is passed into the BlockingCollection and it cancels then you will get the OperationCancelledException as can be seen in the image below (with a couple of workarounds after the image)

GetConsumingEnumerable invokes TryTakeWithNoTimeValidation on the BlockingCollection which in turn raises this exception.

enter image description here

Workaround #1

One potential strategy would be, assuming you have more control over your producers and your consumers, rather than pass the cancellation token into the BlockingCollection, (which will raise this exception) you pass the cancellation token into your producers and into your consumers.

If your producers aren't producing and your consumers aren't consuming, then you have effectively cancelled the operation without raising this exception and by passing CancellationToken.None in your BlockingCollection.

Special cases Cancelling when the BlockingCollection is at BoundedCapacity or Empty

Producers blocked: The producer threads will be blocked when BoundedCapacity on the BlockingCollection is reached. Hence, when attempting to cancel and the BlockingCollection is at BoundedCapacity (which means that your consumers are not blocked but producers are blocked because they cannot add any additional items to the queue) then you will need to allow for additional items to be consumed (one for each producer thread) that will unblock the producers (because they are blocked on adding to the blockingCollection) and in turn allow for your cancellation logic to kick in on the producer side.

Consumers blocked: When your consumers are blocked because the queue is empty, then you could insert an empty unit of work (one for each consumer thread) in the Blocking collection so as to unblock the consumer threads and allow for your cancellation logic to kick in the consumer side.

When there are items in the queue and no limit such as BoundedCapacity or Empty has been reached then the producers and consumer threads should not be blocked.

Workaround #2

Using a cancellation unit of work.

When your application needs to cancel, then your producers (maybe just 1 producer will suffice while the others just cancel producing) will produce a cancellation unit of work (could be null as you also mention or some class that implements a marker interface). When the consumers consume this unit of work and detect that it is in fact a cancellation unit of work, their cancellation logic kicks in. The number of cancellation units of work to be produced needs to equal the number of consumer threads.

Again, caution is needed when we are close to BoundedCapacity, as it could be a sign that some of the producers are blocked. Depending on the number of producers/consumers you could have a consumer consuming until all producers (but 1) have shut down. This ensures that there are no lingering producers around. When there is only 1 producer left, your last consumer can shut down and the producer can stop producing cancellation units of work.

Schlueter answered 26/1, 2012 at 11:27 Comment(3)
So what would wake up the producer/consumer threads? They would be blocked in the BlockingCollection call still?Beltane
@KierenJohnstone Good remark, i've edited the post to include handling for these cases when producers or consumers are blockedSchlueter
If someone feels this answers the question, please let me know how. I don't want to waste the bounty on this, the reason I opened the bounty is because it does not work. E.g. nothing in the workarounds describe how to cancel an add operation being blocked when the queue is fullBeltane
B
1

How about the BlockingQueue I did a while ago?

http://apichange.codeplex.com/SourceControl/changeset/view/76c98b8c7311#ApiChange.Api%2fsrc%2fInfrastructure%2fBlockingQueue.cs

It should do fine without any exceptions. The current queue does simply close the event on dispose which might not be what you want. You might want do enque a null and wait until all items were processed. Apart from this it should suit your needs.

using System.Collections.Generic;
using System.Collections;
using System.Threading;
using System;

namespace ApiChange.Infrastructure
{

    /// <summary>
    /// A blocking queue which supports end markers to signal that no more work is left by inserting
    /// a null reference. This constrains the queue to reference types only. 
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class BlockingQueue<T> : IEnumerable<T>, IEnumerable, IDisposable where T : class
    {
        /// <summary>
        /// The queue used to store the elements
        /// </summary>
        private Queue<T> myQueue = new Queue<T>();
        bool myAllItemsProcessed = false;
        ManualResetEvent myEmptyEvent = new ManualResetEvent(false);

        /// <summary>
        /// Deques an element from the queue and returns it.
        /// If the queue is empty the thread will block. If the queue is stopped it will immedieately
        /// return with null.
        /// </summary>
        /// <returns>An object of type T</returns>      
        public T Dequeue()
        {
            if (myAllItemsProcessed)
                return null;

            lock (myQueue)
            {
                while (myQueue.Count == 0) 
                {
                    if(!Monitor.Wait(myQueue, 45))
                    {
                        // dispatch any work which is not done yet
                        if( myQueue.Count > 0 )
                            continue;
                    }

                    // finito 
                    if (myAllItemsProcessed)
                    {
                        return null;
                    }
                }

                T result = myQueue.Dequeue();
                if (result == null)
                {
                    myAllItemsProcessed = true;
                    myEmptyEvent.Set();
                }
                return result;
            }
        }

        /// <summary>
        /// Releases the waiters by enqueuing a null reference which causes all waiters to be released. 
        /// The will then get a null reference as queued element to signal that they should terminate.
        /// </summary>
        public void ReleaseWaiters()
        {
            Enqueue(null);
        }

        /// <summary>
        /// Waits the until empty. This does not mean that all items are already process. Only that
        /// the queue contains no more pending work. 
        /// </summary>
        public void WaitUntilEmpty()
        {
            myEmptyEvent.WaitOne();
        }

        /// <summary>
        /// Adds an element of type T to the queue. 
        /// The consumer thread is notified (if waiting)
        /// </summary>
        /// <param name="data_in">An object of type T</param>      
        public void Enqueue(T data_in)
        {
            lock (myQueue)
            {
                myQueue.Enqueue(data_in);
                Monitor.PulseAll(myQueue);
            }
        }

        /// <summary>
        /// Returns an IEnumerator of Type T for this queue
        /// </summary>
        /// <returns></returns>    
        IEnumerator<T> IEnumerable<T>.GetEnumerator()
        {
            while (true)
            {
                T item = Dequeue();
                if (item == null)
                    break;
                else
                    yield return item;
            }
        }

        /// <summary>
        /// Returns a untyped IEnumerator for this queue
        /// </summary>
        /// <returns></returns>     
        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable<T>)this).GetEnumerator();
        }


        #region IDisposable Members

        /// <summary>
        /// Closes the EmptyEvent WaitHandle.
        /// </summary>
        public void Dispose()
        {
            myEmptyEvent.Close();
        }

        #endregion
    }
}
Blondie answered 16/6, 2012 at 10:52 Comment(6)
Sorry, way off the mark for what I need. Monitor.Wait is massively slower than BlockingCollection, and you're even adding a minimum latency of 45ms if it has to wait for an item. Performance just not good enough here (unless I am missing something?). You don't explicitly say how to cancel (is it with that bool flag?) and the queue isn't of fixed length! I need Enqueue to block if the queue is full.. but be cancellable! (Kind of the whole point!)Beltane
Hi Kiren, did you measure it? And no 45ms is not the minimum latency but the exact opposite. Cancellation is done via an Enque(null) which will lead to processing of all elements until the null element is dequeued. You are right that it is not bounded but that is easy to add as an check during the enque operation. If you get into saturation quite often then the difference between lock free collections and collections with explicit locking will not matter anymore since you are waiting for the items to be processed by your processor.Blondie
Hi Alois, if your Enqueue is blocking because the queue is full, it's not possible to enqueue an additional null. The enqueue operation needs to be block but be cancellable, too. Can you suggest anything for that?Beltane
I would do a check for null in my Enqueue method and enqueue this anyway regardless of the current queue count. I know it sounds rather obvious but that is the way I would do it. Nothing bad will happen if you have one more element in the queue than the max count (at least not if you do not try some really weird stuff).Blondie
But the null will be at the back of the queue. So it won't cancel immediately, it would process the entire queue (20,000 items in this case) before cancelling. That's not a 'cancel' operation, is it?Beltane
You could add a ClearAndCancel method which takes a lock on the queue, clears the queue and then enqueues a null reference. The effect will be the one you want. As added bonus your object references will be cleaned up faster by the GC.Blondie
J
1

You cound signal the end of a batch by setting a flag on the last item (add a IsLastItem bool property to it or wrap it). Or you might send a null as last item (not sure if a null goes through the blockingcollection correctly though).

If you can remove the need for the 'batch' concept you can create an extra thread to continously Take() and Process new Data from your blockingcollection and do nothing else.

Jimenez answered 19/6, 2012 at 15:26 Comment(5)
How does this handle cancellation when the queue is blocked because it is full? Because it needs to..Beltane
I never set maximum capacity (at the risk of out of memory). But if you do I see no way to not get errors on cancellation of a blocking Add that is waiting. You could change to using TryAdd() in a loopy way, so you have a non-blocking Add.Jimenez
This is the crux of the issue, and why this is difficult to solve. Loops/polling=slow. Far, far slower than BlockingCollection<>. The only thing I need is BlockingCollection<> not to use exceptions. It's way faster than looping when adding, but uses exceptions when cancelled. THAT's what I want not to happenBeltane
TryAdd() has an optional TimeOut parameter. So you could use a loop which does a TryAdd() for a second (blocking shortly) and checks for cancels every second only. You will need to restructure Cancelling so you are not TryAdding() when a Cancel is raised or it will probably still throw an exception.Jimenez
Second example on this page uses a Timed TryAdd() msdn.microsoft.com/en-us/library/dd997306.aspxJimenez
G
0

Kieren,

From my inspection, I personally don't know any thread safe type for ProducerConsumer pattern which does exactly what you wanted. I don't claim this as competitive solution but propose you decorate BlockingCollection<T> with few extension method which will give you the freedom to supply any built-in or custom types instead of default CancellationToken.

Stage 1:

Following are the list of default method which use underling TryAddWithNoTimeValidation method to add to queue.

public void Add(T item){
      this.TryAddWithNoTimeValidation(item, -1, new CancellationToken());
}

public void Add(T item, CancellationToken cancellationToken){
      this.TryAddWithNoTimeValidation(item, -1, cancellationToken);
    }

public bool TryAdd(T item){
      return this.TryAddWithNoTimeValidation(item, 0, new CancellationToken());
    }

public bool TryAdd(T item, TimeSpan timeout){
      BlockingCollection<T>.ValidateTimeout(timeout);
      return this.TryAddWithNoTimeValidation(item, (int) timeout.TotalMilliseconds, new CancellationToken());
    }

public bool TryAdd(T item, int millisecondsTimeout){
      BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout);
      return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, new           CancellationToken());
}

public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken){
 BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout);
 return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken);
}

Now you can provide extension for any/all of method which you are interested.

Stage 2:

You now refer your implementation of TryAddWithNoTimeValidation instead of default.

I can give you an alternate version of TryAddWithNoTimeValidation which safely continue without throwing OperationCancellation exception.

Gompers answered 20/6, 2012 at 13:23 Comment(7)
Saying I can use extension methods is kind of a coding suggestion. What I'm asking for is how to implement this with high-performance without using cancellation tokens. This answer just tells me how to use extension methods, which I already know how to use..Beltane
deleted my answer. obviously you are theorecist. I am a practitioner. I know how to make my app running with 30us latency. I tried to help but you want talk theoretically. this is not my cup of tea. good luckIllustrative
@Bobb - humm, well I am talking very practically I think. Both of your pieces of code would have .NET throwing exceptions inside the cancellation method. Throwing an exception is an actual thing that happens, and is the bit I want to remove. That's very practical, isn't it?Beltane
No. it will throw an exception and I never said it wont. What I did was practical - I heard the voice of the populi and went back to white board and actually modelled MY situation. And I found that for MY blocking queue exception works FASTER than if(someFlag) break; thingy..... What you are doing is sitting next to the display and typing - I think it is slow. I think it is slow. I think it is slow........ Practical approach would -Illustrative
a) make your own benchmark and aknowledge that exception is faster. b) understand the fact the exception catch happens once per bunch and bool flag check happens on each item iteration c) redesign you worker to have low number of exceptions and use the accumulated savings of the exception vs bool flag towards your computation rather than if(flag) waste.....Illustrative
I'll make it clear: internally, the BlockingCollection<> uses exceptions. I can't "acknowledge that the exception is faster", because I can't modify the source to BlockingCollection<>. I cannot redesign my worker to have a lower number of exceptions, because it's not my worker that throws the exceptions. BlockingCollection<> does, internally. That is my issue. How do you propose I turn off BlockingCollection<>'s exceptions so I can measure without them?Beltane
@Bob: throwing exceptions is much slower than a simple bool check. There is no point in debating that. If you have measured that throwing exceptions is faster than a bool check I do know that your perf tests and the conclusions you did derive from it are flawed. You do not need to take my word but simply debug an if(flag) return else xxxx compared to if(flag) throw new Exception() else xxxx with disassembly enabled. During the exception throwing and stack unwind much more assembly instructions are executed than for the bool check (including some bool checks there as well)...Blondie
L
0

My suggestion is to implement this functionality by encapsulating an asynchronous queue, like the BufferBlock<T> class from the TPL Dataflow library. This class is a thread-safe container intended for producer-consumer scenarios, and supports backpressure (BoundedCapacity) just like the BlockingCollection<T> class. Being asynchronous means that the corresponding Add/Take methods (SendAsync/ReceiveAsync) return tasks. These tasks store the event of a cancellation as an internal state, that can be queried with the IsCanceled property, so throwing exceptions internally can be avoided. Propagating this state with exceptions can also be avoided, by waiting the tasks using a exception-suppressing continuation (ContinueWith). Here is an implementation:

/// <summary>
/// A thread-safe collection that provides blocking and bounding capabilities.
/// The cancellation is propagated as a false result, and not as an exception.
/// </summary>
public class CancellationFriendlyBlockingCollection<T>
{
    private readonly BufferBlock<T> _bufferBlock;

    public CancellationFriendlyBlockingCollection()
    {
        _bufferBlock = new BufferBlock<T>();
    }

    public CancellationFriendlyBlockingCollection(int boundedCapacity)
    {
        _bufferBlock = new BufferBlock<T>(new() { BoundedCapacity = boundedCapacity });
    }

    public bool TryAdd(T item, CancellationToken cancellationToken = default)
    {
        if (cancellationToken.IsCancellationRequested) return false;
        if (_bufferBlock.Post(item)) return true;
        Task<bool> task = _bufferBlock.SendAsync(item, cancellationToken);
        WaitNoThrow(task);
        if (!task.IsCompletedSuccessfully) return false;
        return task.Result;
    }

    public bool TryTake(out T item, CancellationToken cancellationToken = default)
    {
        if (cancellationToken.IsCancellationRequested) { item = default; return false; }
        if (_bufferBlock.TryReceive(out item)) return true;
        Task<T> task = _bufferBlock.ReceiveAsync(cancellationToken);
        WaitNoThrow(task);
        if (!task.IsCompletedSuccessfully) return false;
        item = task.Result; return true;
    }

    public IEnumerable<T> GetConsumingEnumerable(
        CancellationToken cancellationToken = default)
    {
        while (TryTake(out var item, cancellationToken)) yield return item;
    }

    public void CompleteAdding() => _bufferBlock.Complete();
    public bool IsCompleted => _bufferBlock.Completion.IsCompleted;
    public int Count => _bufferBlock.Count;

    // Wait the task to complete without throwing exceptions
    private static void WaitNoThrow(Task task)
    {
        if (task.IsCompleted) return;
        task.ContinueWith(_ => { }, default,
            TaskContinuationOptions.ExecuteSynchronously |
            TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Wait();
        Debug.Assert(task.IsCompleted);
    }
}

Performance: The CancellationFriendlyBlockingCollection.TryTake method can be invoked with a canceled CancellationToken in a loop with a frequency of about 15,000,000 times per second in my PC (on a single thread). For comparison the frequency of the BlockingCollection<T>.Take under the same conditions is about 20,000 times per second.

You might be tempted to replace the BufferBlock<T> with a more modern asynchronous queue like the Channel<T>. In that case please make sure to read this question first, in order to be aware about a leaky behavior of this class, under specific conditions.

Litre answered 21/11, 2021 at 18:31 Comment(1)
Note, the original implementation attempted to block synchronously by using the SuppressException struct by Stephen Toub. This was a bug, because this struct can only be used with await.Litre

© 2022 - 2024 — McMap. All rights reserved.