I have one process generating work and a second process with a BlockingCollection<>
that consumes that work. When I close my program, I need my consumer to stop consuming work, but I still need to quickly log the work that was pending but hadn't been consumed.
Right now, my consumer spawns a thread that has a foreach (<object> in BlockingCollection.GetConsumingEnumerable())
loop. When I stop my program, my producer calls Consumer.BlockingCollection.CompleteAdding()
. What I find is that my consumer continues to process everything in the queue.
Googling the issues tells me that I need to use a CancellationToken
. So I tried that out:
private void Process () { // This method runs in a separate thread
try {
foreach (*work* in BlockingCollection.GetConsumingEnumerable(CancellationToken)) {
// Consume
}
}
catch (OperationCancelledException) {
foreach (*work* in BlockingCollection.GetConsumingEnumerable()) {
// quickly log
}
}
}
My producer has:
private CancellationTokenSource StopFlag = new CancellationTokenSource ();
MyConsumer.CancellationToken = StopFlag.Token;
// Make the consumer spawn it's consuming thread...
StopFlag.Cancel ();
MyConsumer.BlockingCollection.CompleteAdding ();
When I try this, I get no indication that the OperationCancelledException ever happened.
This question tries to explain the use of a cancellation token, but it seems like it's not using it correctly. (Argument: if it works, then it's "correct enough".) And this question would appear to be an exact duplicate of my question but with no example. (Same here.)
So to reiterate: How do I properly use a CancellationToken
on BlockingCollection.GetConsumingEnumerable()
with the caveat that I need to process the remaining items in the queue after it gets cancelled using a different method?
(I think my problem is centered around the proper use of the CancellationToken. None of my testing indicates that the process is actually being cancelled. (StopFlag.IsCancellationRequested
always equals false
.))
GetConsumingEnumerable
will wait for more items to be added. Inside thecatch
use theforeach
on the collection itself. – Impenitentpublic BlockingCollection<cOrder> OrderQueue = new BlockingCollection<cOrder> ();
– Campus