How should I use DataflowBlockOptions.CancellationToken?
Asked Answered
F

2

6

How could I use DataflowBlockOptions.CancellationToken?

If I create instance of BufferBlock like this:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = _cts.Token });

then having consumer/producer methods that use queue, how can I use its CancellationToken to handle cancellation?

E.g. in producer method, how can I check the cancellation token - I haven't found any property to access the token..

EDIT: Sample of produce/consume methods:

private static async Task Produce(BufferBlock<int> queue, IEnumerable<int> values)
{
    foreach (var value in values)
    {
        await queue.SendAsync(value);
    }

    queue.Complete();
}

private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
{
    var ret = new List<int>();
    while (await queue.OutputAvailableAsync())
    {
        ret.Add(await queue.ReceiveAsync());
    }

    return ret;
}

Code to call it:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = _cts.Token });

// Start the producer and consumer.
var values = Enumerable.Range(0, 10);
Produce(queue, values);
var consumer = Consume(queue);

// Wait for everything to complete.
await Task.WhenAll(consumer, queue.Completion);

EDIT2:

If I call _cts.Cancel(), the Produce method does not cancel and finishes without interruption.

Firebox answered 20/4, 2015 at 12:38 Comment(4)
what do you mean with how I can use? If you want to cancel the operation you can call .Cancel on the _cts object (assuming its a CancellationTokenSource. Of course you can always check if there is a cancellation requested with IsCancellationRequestedBernardina
I can pass cancellation token into BufferBlock constructor (through DataflowBlockOptions). Then, if I pass this instance of BufferBlock into some library's produce method, that method has no way to access the token from BufferBlock. So what is usage of this?Firebox
can you give a complete example? In the case you can just give the Token to the Task you are going to create in the end as well see here for an introductionBernardina
@CarstenKönig see my edited question. I'm specifically interested in how could I use the token passed in DataflowBlockOptions.CancellationToken or what is the purpose of passing it into the constructor of BufferBlockFirebox
I
3

Normally you use the CancellationToken option in order to control the cancellation of a dataflow block, using an external CancellationTokenSource. Canceling the block (assuming that its a TransformBlock) has the following immediate effects:

  1. The block stops accepting incoming messages. Invoking its Post returns false, meaning that the offered message is rejected.
  2. The messages that are currently stored in the block's internal input buffer are immediately discarded. These messages are lost. They will not be processed or propagated.

If the block is not currently processing any messages, the following effects will also follow immediately. Otherwise they will follow when the processing of all currently processed messages is completed:

  1. All the processed messages that are currently stored in this block's output buffer are discarded. The last processed messages (the messages that were in the middle of processing when the cancellation occurred) will not be propagated to linked blocks downstream.
  2. Any pending asynchronous SendAsync operations targeting the block, that were in-flight when the cancellation occurred, will complete with a result of false (meaning "non accepted").
  3. The Task that represents the Completion of the block transitions to the Canceled state. In other words this task's IsCanceled property becomes true.

You can achieve all but the last effect directly, without using the CancellationToken option, by invoking the block's Fault method. This method is accessible through the IDataflowBlock interface that all blocks implement. You can use it like this:

((IDataflowBlock)block).Fault(new OperationCanceledException());

The difference is that the Completion task will now become Faulted instead of Canceled. This difference may or may not be important, depending on the situation. If you just await the Completion, which is how this property is normally used, in both cases a OperationCanceledException will be thrown. So if you don't need to do anything fancy with the Completion property, and you also want to avoid configuring the CancellationToken for some reason, you could consider this trick as an option.


Update: Behavior when the cancellation occurs after the Complete method has been invoked, in other words when the block is already in its completion phase, but has not completed yet:

  1. If the block is a processing block, like a TransformBlock, all of the above will happen just the same. The block will transition soon to the Canceled state.
  2. If the block is a non-processing block, like a BufferBlock<T>, the (3) from the list above will not happen. The output buffer of a BufferBlock<T> is not emptied, when the cancellation happen after the invocation of the Complete method. See this GitHib issue for a demonstration of this behavior. Please take into consideration that the Complete method may be invoked not only manually, but also automatically, if the block has been linked as the target of a source block, with the PropagateCompletion configuration enabled. You may want to check out this question, to understand fully the implications of this behavior. Long story short, canceling all the blocks of a dataflow pipeline that contains a BufferBlock<T>, does not guarantee that the pipeline will terminate.

Side note: When both the Complete and Fault methods are invoked, whatever was invoked first prevails regarding the final status of the block. If the Complete was invoked first, the block will complete with status RanToCompletion. If the Fault was invoked first, the block will complete with status Faulted. Faulting a Completed block has still an effect though: it empties its internal input buffer.

Incongruent answered 31/8, 2020 at 20:31 Comment(6)
Extremely important information. If this isn't in the docs, it should be. I didn't find this info anywhere but here.Slaty
@RonnieOverby indeed, these behaviors are not documented. Personally I went through a period of enthusiasm with the TPL Dataflow library, before eventually becoming disappointed by it's obscure API, inadequate extensibility, general looseness and the bug.Incongruent
Gross... I didn't know about that bug. That's a real shame... I was willing to put up with the obscurity of the API to get the gains the model provides. But not now. I've reached for dataflow a few times in the past, but for my current efforts I think I'll punt and use a different approach. I want my service to be unstoppable. Just not in that way.Slaty
@RonnieOverby the problem is that, in case an error occurs, a dataflow pipeline in not guaranteed to complete. As long as every block is running smoothly, the pipeline is unstoppable. But for me terminating gracefully in case of an error is very important, and the TPL Dataflow doesn't guarantee that (because of the bug). For simple pipelines I would be tempted to go with custom code, like in this answer (the ParallelizeTwoActions method).Incongruent
I don't use DF for simple pipelines since I have experience with the alternatives: imperative code that manages threads/tasks, Parallel.*, PLINQ, the even more obscure Rx. A decent BCL-native stand-in for DF would use regular looping code or Parallel.For[Each][Async] calls that read to and write from System.Threading.Channels. Those primitives would involve more code for sure, but at least the primitives are easier to understand and give clarity to the reader and can also be arranged similarly to DF blocks, without a lot off nesting or the appearance of call stack depth.Slaty
@RonnieOverby the Rx doesn't have good error behavior either. It leaks fire-and-forget operations. And the same can easily happen with any Parallel/channels custom code if you are not careful.Incongruent
W
2

If you want to cancel produce process you should pass token in it, like this:

    private static async Task Produce(
        BufferBlock<int> queue, 
        IEnumerable<int> values,
        CancellationToken token
        )
    {
        foreach (var value in values)
        {
            await queue.SendAsync(value, token);
            Console.WriteLine(value);
        }

        queue.Complete();
    }

    private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
    {
        var ret = new List<int>();
        while (await queue.OutputAvailableAsync())
        {
            ret.Add(await queue.ReceiveAsync());
        }

        return ret;
    }

    static void Main(string[] args)
    {
        var cts = new CancellationTokenSource();

        var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = cts.Token });

        // Start the producer and consumer.
        var values = Enumerable.Range(0, 100);
        Produce(queue, values, cts.Token);
        var consumer = Consume(queue);

        cts.Cancel();

        try
        {
            Task.WaitAll(consumer, queue.Completion);
        }
        catch (Exception e)
        {
            Console.WriteLine(e.ToString());
        }

        foreach (var i in consumer.Result)
        {
            Console.WriteLine(i);
        }

        Console.ReadKey();
Wherever answered 21/4, 2015 at 7:52 Comment(2)
If you modify Produce to output the value in the loop you can see that cancelling the operation does not prevent the producer sending to queue. My expectation was that on the line await queue.SendAsync(i); it will throw TaskCancellationException. Moreover, Produce() cannot access the CancellationToken from the queue to check its state and cancel itself. From this, it seems it's better to pass cancellation token directly to Produce/Consume methods rather than relying on DataflowBlockOptions. Still, I'm not sure if I'm missing something..Firebox
@FilipHurta Perhaps, SendAsync not checking cancelation, because the send side and receive side should have minimum shared resources, or performance reasonsWherever

© 2022 - 2024 — McMap. All rights reserved.