How to cancel GetConsumingEnumerable() on BlockingCollection
Asked Answered
V

2

8

In the following code I'm using the CancellationToken to wake up the GetConsumingEnumerable() when the producer is not producing and I want to break out of the foreach and exit the Task. But I dont see IsCancellationRequested being logged and my Task.Wait(timeOut) waits for the full timeOut period. What am I doing wrong?

userToken.Task = Task.Factory.StartNew(state =>
{
    userToken.CancelToken = new CancellationTokenSource();

    foreach (var broadcast in userToken.BroadcastQueue.GetConsumingEnumerable(userToken.CancelToken.Token))
    {
        if (userToken.CancelToken.IsCancellationRequested)
        {
            Log.Write("BroadcastQueue IsCancellationRequested");
            break;
            ...
        }
    }

    return 0;
}, "TaskSubscribe", TaskCreationOptions.LongRunning);

later...

UserToken.CancelToken.Cancel();          
try
{
    task.Wait(timeOut);
}
catch (AggregateException ar)
{
    Log.Write("AggregateException " + ar.InnerException, MsgType.InfoMsg);
}
catch (OperationCanceledException)
{
    Log.Write("BroadcastQueue Cancelled", MsgType.InfoMsg);
}
Varistor answered 2/2, 2012 at 2:36 Comment(5)
Is your first snippet being run from an anonymous method?Hive
I've run into similar problems when implementing a ThreadStart delegate. My solution was to set the local property as static (which is bad practice I know, but it worked). That doesn't appear to be an option in your case so I'm interested in reading your answers.Hive
Well I refactored the code to use a function rather than an anonymous function and the cancel is working. Task start now looks like this userToken.Task = Task.Factory.StartNew(state => SubscribeTask(userToken, receiveSendEventArgs, request, tokenId), "TaskSubscribe", TaskCreationOptions.LongRunning);Varistor
I started a bounty to help drive a definitive answer for the cause. The behavior we both have seen so far makes no sense. Hopefully someone will be able to elaborate.Hive
@Varistor not really refactoring as you changed the binaries behaviour.Nog
S
14

You could use CompleteAdding() which signifies that no more items will be added to the collection. If GetConsumingEnumerable is used, the foreach will end gracefully as it will know there's no point in waiting for more items.

Basically once you have finished adding items to the BlockingCollection just do:

myBlockingCollection.CompleteAdding() 

Any threads which are doing a foreach loop with GetConsumingEnumerable will stop looping.

Soffit answered 27/9, 2012 at 11:12 Comment(2)
I am using CompleteAdding() in the producer thread but if no items are being added the producer thread is blocked. The Cancellation token timeout on the consumer thread is saying "I haven't heard from the producer for a long time so I'm cancelling myself". I don't know if this is a bad pattern as there seems to be an implicit rule that the producer owns the queue but we've been given cancellation on the consumer so I'm trying to do it from both endsVaristor
I know this is old, but this actually doesn't work in dot net core. I'm not sure if it's a bug, but that was the expected behaviour, except it doesn't seem to wkemEugeneeugenia
N
6

I've created quick prototype, and it seems work for me.

Note Thread.Sleep(1000) right before token cancel request. It is possible that you are creating a race condition for Token variable, since you create and access item.CancelToken variable in different threads.

E.g. the code that is intended to cancel task might invoke cancel on wrong (previous or null) cancellation token.

static void Main(string[] args)
{
    CancellationTokenSource token = null;
    BlockingCollection<string> coll = new BlockingCollection<string>();
    var t = Task.Factory.StartNew(state =>
    {
        token = new CancellationTokenSource();
        try
        {
            foreach (var broadcast in coll.GetConsumingEnumerable(token.Token))
            {
                if (token.IsCancellationRequested)
                {
                    return;
                }
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Cancel");
            return;
        }
    }, "TaskSubscribe", TaskCreationOptions.LongRunning);
    Thread.Sleep(1000);
    token.Cancel();
    t.Wait();
}
Nitramine answered 4/2, 2012 at 3:39 Comment(13)
Now move your anonymous method to a standalone method. Is the Thread.Sleep still necessary? If not, why?Hive
Yes, it would be still necessarily (and it's smelly code), To get rid of it you would need to move new CancellationTokenSource() somewhere, before task creation.Nitramine
Then it doesn't answer the question. The OP was able to solve their problem by moving the anonymous method into a non-anonymous one. The question is why?Hive
I understand about the possibility of a race condition in the code by Valery Koluppaev but I can assure you that in my prod code the Task gets started and runs so the position of the newing of the CancellationSource() is not a factorVaristor
After some more reading I think I may be using the cancel token to GetConsumingEnumerable() incorrectly. Unless I call ThrowIfCancellationRequested() calling cancel() will just set the IsCancellationRequested flag. As I am using cancel() to wake up GetConsumingEnumerable() when it is blocked and waiting for the producer to Add() something I'll never poll the flag. After calling ThrowIfCancelletionRequested() I'll get the exception I'm waiting for. I'll try it out on MondayVaristor
@Spud, The behavior you've described seems very weird to me (that part with static method), Can you provide more context, or try to inline that function back? Maybe it is hidden racing condition, which you've removed by extracting method body to separate class?Nitramine
@Varistor Actually, ThrowIsCancellationRequested() throws exception immediately. I guess BlockingQueue pools it internally, and throws an exception in case if cancellation is requested.Nitramine
Thats the behaviour I want. My client which consumes the enumerable is on a socket. When the socket closes (hours later, no race condition) I want to shutdown the producer and consumer. Setting the complete flag stops the consumer and stops the consumer if the producer is adding. If the producer is stalled I need to wake up the consumer so I can exit the Task so throwing immediately (synchronously) is perfect. The reading I've done this weekend emphasises that the new cancellation model is cooperative to allow graceful shutdown but for waiting Task I need something a little more forceful ;-)Varistor
Well I've fixed my problem but I don't think you're going to like the answer. I wrapped the Wait() in another Task. For me anyway it seems that the Wait() was running on the same thread as the Task and blocking it. Looks like other people have had this behaviour too. Maybe I would get differnt behaviour on my quad core laptop rather than my dual core machine here at work. I've even been able to change the Task back to an anonymous method. I guess I'm going to have to look into Task scheduling nowVaristor
Hm, that really weird. The only way that problems make scene to me, it is hidden race condition. It's possible that you had masked it, by wrapping cancel/wait into the task. That may have postponed the execution (and made race-condition less frequent) or reduced degree of parallelism by sharing the same thread pool with concurrent task. I've spent plenty of time looking trough BlockingQueue source code, and it seems rock-solid to me.Nitramine
Btw, can you please, provide an example, that can reproduce the problem out of context?Nitramine
I'll award the bounty to you because (besides being the only answer) you stuck with @Varistor through the problem. In the end our problems ended up being much different so it's 150 rep for something I could care less about. Lesson learned I suppose.Hive
@Hive it is working in a non-anonymous method due to the way the iterator state machine is constructed and stored by the compiler. See blogs.msdn.microsoft.com/oldnewthing/20080812-00/?p=21273 and Eric Lipper blog. I bet the iterator state and capturing of the cancellation token do not mix well in anonymous methods, but that can be discovered only looking at the generated ILShakira

© 2022 - 2024 — McMap. All rights reserved.