Cancelling BlockingCollection.GetConsumingEnumerable() and processing what's left
Asked Answered
C

3

7

I have one process generating work and a second process with a BlockingCollection<> that consumes that work. When I close my program, I need my consumer to stop consuming work, but I still need to quickly log the work that was pending but hadn't been consumed.

Right now, my consumer spawns a thread that has a foreach (<object> in BlockingCollection.GetConsumingEnumerable()) loop. When I stop my program, my producer calls Consumer.BlockingCollection.CompleteAdding(). What I find is that my consumer continues to process everything in the queue.

Googling the issues tells me that I need to use a CancellationToken. So I tried that out:

private void Process () { // This method runs in a separate thread
    try {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable(CancellationToken)) {
            // Consume
        }
    }
    catch (OperationCancelledException) {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable()) {
            // quickly log
        }
    }
}

My producer has:

private CancellationTokenSource StopFlag = new CancellationTokenSource ();
MyConsumer.CancellationToken = StopFlag.Token;
// Make the consumer spawn it's consuming thread...
StopFlag.Cancel ();
MyConsumer.BlockingCollection.CompleteAdding ();

When I try this, I get no indication that the OperationCancelledException ever happened.

This question tries to explain the use of a cancellation token, but it seems like it's not using it correctly. (Argument: if it works, then it's "correct enough".) And this question would appear to be an exact duplicate of my question but with no example. (Same here.)

So to reiterate: How do I properly use a CancellationToken on BlockingCollection.GetConsumingEnumerable() with the caveat that I need to process the remaining items in the queue after it gets cancelled using a different method?

(I think my problem is centered around the proper use of the CancellationToken. None of my testing indicates that the process is actually being cancelled. (StopFlag.IsCancellationRequested always equals false.))

Campus answered 11/11, 2013 at 17:10 Comment(3)
The GetConsumingEnumerable will wait for more items to be added. Inside the catch use the foreach on the collection itself.Impenitent
I'd love to try that, Romoku! How do I get to it? I used the simple ctor: public BlockingCollection<cOrder> OrderQueue = new BlockingCollection<cOrder> ();Campus
You need to check the cancellation token. CancellationToken.IsCancellationRequeste In your case if true then still process but log rather than process. msdn.microsoft.com/en-us/library/dd997289.aspxTippets
C
3

My problem was in how I was trying to cancel the operation. Instead of having my producer owning the CancellationTokenSource, I put it all in the consumer.

public class cProducer {
    private cConsumer myConsumer = new cConsumer ();

    public void onStart () {
        myConsumer.OnStart ();
    }

    public void onStop () {
        myConsumer.OnStop ();
    }

    public void OnOrderReceived (cOrder newOrder) {
        myConsumer.orderQueue.Add (cOrder);
    }
}

public class cConsumer {
    private CancellationTokenSource stopFlag;
    public BlockingCollection<cOrder> orderQueue = new BlockingCollection<cOrder> ();
    private Task processingTask;

    public void OnStart () {
        stopFlag = new CancellationTokenSource ();
        processingTask = Task.Factory.StartNew (() => Process ());
    }

    public void OnStop () {
        stopFlag.Cancel ();
        orderQueue.CompleteAdding ();
        processingTask.Wait ();
    }

    private void Process () {
        try {
            foreach (cOrder newOrder in orderQueue.GetConsumingEnumerable (stopFlag.Token)) {
                // process
            }
        }
        catch (OperationCanceledException) {
            foreach (cOrder cancelledOrder in orderQueue.GetConsumingEnumerable ()) {
                // log it
            }
        }
    }
}
Campus answered 11/11, 2013 at 18:14 Comment(2)
Why do you have a GetConsumingEnumerable call in your OperationCanceledException handler? And furthermore, why doesn't this block the processingTask.Wait() call from finishing.Asha
msdn.microsoft.com/en-us/library/dd287086(v=vs.110).aspx says "After a collection has been marked as complete for adding, adding to the collection is not permitted and attempts to remove from the collection will not wait when the collection is empty." So i suppose that makes sense now, but you can also enumerate it as you would any IEnumerable in the catch block.Asha
T
5

When you pass in the CancellationToken to GetConsumingEnumerable it won't throw an exception of cancellation is requested, it'll just stop spitting out items. Rather than catching the exception just check the token:

foreach (var item in BlockingCollection.
    GetConsumingEnumerable(CancellationToken))
{
    //consume item
}
if (CancellationToken.IsCancellationRequested)
    foreach (var item in BlockingCollection)
    {
        //log item
    }

Also note that, if cancellation is requested, and it's possible that CompletedAdding hasn't been called then you should just iterate the collection, not call GetConsumingEnumerable. If you know that the producer will complete adding when the operation is cancelled then that's not a problem.

Trichome answered 11/11, 2013 at 17:40 Comment(6)
I tried the if() statement both outside the loop (like you have it) and inside the loop. Neither one worked. I'm not even getting notification that the cancellation flag is doing anything.Campus
@Campus Then your token isn't being canceled, and that's your problem. Ensure that the token is actually being cancelled before you finish processing the items.Trichome
@Campus Based on your usage code, you're cancelling just before you finish adding. There's a good chance that both operations will be finished by the time the consuming code runs again, and if your consumer is faster than your producer (therefore it has no items left) it could very easily finish consuming all items before the cancellation token indicates it should be canceled. You should be working your example such that it's impossible for the consumer to finish consuming normally before being cancelled if that's the case you want to test.Trichome
I am. I have Sleep() calls simulating work. Each step of "processing" logs the item processed and the remaining number of items in the queue. (BlockingCollection.Count) My original code showed the count winding down to zero. Subsequent tests show the count simply stopping, but I never get my message of "Operation was cancelled"Campus
@Campus So you've verified that the collection isn't empty, but IsCancellationRequested is false, after the end of the foreach loop? I find that very odd. If that's the case, can you provide a complete code example to replicate the problem?Trichome
GetConsumingEnumerable WILL throw an exception when cancellation is requestedCruzcruzado
C
3

My problem was in how I was trying to cancel the operation. Instead of having my producer owning the CancellationTokenSource, I put it all in the consumer.

public class cProducer {
    private cConsumer myConsumer = new cConsumer ();

    public void onStart () {
        myConsumer.OnStart ();
    }

    public void onStop () {
        myConsumer.OnStop ();
    }

    public void OnOrderReceived (cOrder newOrder) {
        myConsumer.orderQueue.Add (cOrder);
    }
}

public class cConsumer {
    private CancellationTokenSource stopFlag;
    public BlockingCollection<cOrder> orderQueue = new BlockingCollection<cOrder> ();
    private Task processingTask;

    public void OnStart () {
        stopFlag = new CancellationTokenSource ();
        processingTask = Task.Factory.StartNew (() => Process ());
    }

    public void OnStop () {
        stopFlag.Cancel ();
        orderQueue.CompleteAdding ();
        processingTask.Wait ();
    }

    private void Process () {
        try {
            foreach (cOrder newOrder in orderQueue.GetConsumingEnumerable (stopFlag.Token)) {
                // process
            }
        }
        catch (OperationCanceledException) {
            foreach (cOrder cancelledOrder in orderQueue.GetConsumingEnumerable ()) {
                // log it
            }
        }
    }
}
Campus answered 11/11, 2013 at 18:14 Comment(2)
Why do you have a GetConsumingEnumerable call in your OperationCanceledException handler? And furthermore, why doesn't this block the processingTask.Wait() call from finishing.Asha
msdn.microsoft.com/en-us/library/dd287086(v=vs.110).aspx says "After a collection has been marked as complete for adding, adding to the collection is not permitted and attempts to remove from the collection will not wait when the collection is empty." So i suppose that makes sense now, but you can also enumerate it as you would any IEnumerable in the catch block.Asha
C
0

I had the exact same problem. The BlockingCollection seemed to be deadlocked when I cancelled the procedure. The OperationCanceledException was not propagated to the calling method. I figured out that my Producer did not take the cancellation token in consideration and was therefore waiting for the queue to be consumed. All I had to do was to provide the cancellation token in the Add() method. To translate this into Jason's solution above, all I did was this:

public void OnOrderReceived (cOrder newOrder, CancellationToken cancellationToken) 
{
    myConsumer.orderQueue.Add (cOrder, cancellationToken);
}

The Process() method does not need any try-catch clause. However you need to throw if the process is cancelled:

private void Process () 
{
    foreach (cOrder newOrder in orderQueue.GetConsumingEnumerable (stopFlag.Token)) 
    {
        // process
        stopFlag.Token.ThrowIfCancellationRequested();
    }
}
Cothran answered 14/3, 2022 at 13:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.