using TPL Dataflow, can I cancel all posts and then add one?
Asked Answered
D

3

7

With the TPL Dataflow library, I would like to do something like this:

myActionBlock.Post(newValue, cancelAllPreviousPosts: true);

It appears that the cancellation token on ActionBlock cancels the whole thing; I'd have to make a new ActionBlock if I set that one. Is it possible to do a partial cancellation with ActionBlock?

Posts that have not been processed yet should not be attempted. It would be nice if there was some cancellation token available to check in the currently-executing post.

Digitigrade answered 19/2, 2014 at 18:7 Comment(1)
I posted this a while back, but I have since made my own library; I have a "most recent" action queue in my Kts.ActorsLite library: github.com/BrannonKing/Kts.ActorsLiteDigitigrade
A
4

Take a look at BroadcastBlock<T>, which only holds the most recent item posted to it. You can put a broadcast block in front of an ActionBlock<T>.

While posting a new item to the broadcast block won't cancel the item currently being processed by the action block, it will overwrite any existing item already held by the broadcast block; in effect discarding any older messages not yet processed by the action block. When the action block completes its current item, it will take the most recent item posted to the broadcast block.

Aldarcie answered 24/2, 2014 at 2:4 Comment(0)
M
2

In addition to Monroe Thomas's answer it is important to understand that the ActionBlock following the BroadcastBlock needs it's BoundedCapacity limited to 1 or it will store and process every message of the broadcast block, even when it is still executing.
A code example goes here:

ActionBlock<int> ExecuteBlock = new ActionBlock<int>(async ThisNumber =>
{
  await Task.Delay(100);
  Console.WriteLine($">{ThisNumber}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

BroadcastBlock<int> ThrottleBlock = new BroadcastBlock<int>(null);
ThrottleBlock.LinkTo(ExecuteBlock, new DataflowLinkOptions { PropagateCompletion = true });

for(int IX = 0; IX < 128; IX++)
{
  await ThrottleBlock.SendAsync(IX);
  await Task.Delay(10);
}

This results in the following:

>0
>6
>12
>20
>27
>34
>41
>48
>55
>62
>68
>75
>82
>88
>95
>101
>108
>115
>122
>127

Enjoy!
-Simon

Minorca answered 2/1, 2018 at 14:56 Comment(1)
Good clarification! :)Aldarcie
F
1

There is nothing like this directly in TPL Dataflow, but I can see several ways how you could implement it yourself:

  1. If you don't need to be able to treat the modified block as a normal Dataflow block (e.g. no support for LinkTo()), then a simple way would to write a type that wraps ActionBlock, but whose items also contain a flag that says whether they should be processed. When you specify cancelAllPreviousPosts: true, all those flags are reset, so those items are going to be skipped.

    The code could look something like this:

    class CancellableActionBlock<T>
    {
        private class Item
        {
            public T Data { get; private set; }
            public bool ShouldProcess { get; set; }
    
            public Item(T data)
            {
                Data = data;
                ShouldProcess = true;
            }
        }
    
        private readonly ActionBlock<Item> actionBlock;
        private readonly ConcurrentDictionary<Item, bool> itemSet;
    
        public CancellableActionBlock(Action<T> action)
        {
            itemSet = new ConcurrentDictionary<Item, bool>();
            actionBlock = new ActionBlock<Item>(item =>
            {
                bool ignored;
                itemSet.TryRemove(item, out ignored);
    
                if (item.ShouldProcess)
                {
                    action(item.Data);
                }
            });
        }
    
        public bool Post(T data, bool cancelAllPreviousPosts = false)
        {
            if (cancelAllPreviousPosts)
            {
                foreach (var item in itemSet.Keys)
                {
                    item.ShouldProcess = false;
                }
                itemSet.Clear();
            }
    
            var newItem = new Item(data);
            itemSet.TryAdd(newItem, true);
            return actionBlock.Post(newItem);
        }
    
        // probably other members that wrap actionBlock members,
        // like Complete() and Completion
    }
    
  2. If you want to create something that's more composable and reusable, you could create a special block just for that cancellation. You could implement that using thee BufferBlocks linked together, where the third one would have capacity of 1 and the second one unlimited capacity. This way, almost all the queued items would be in the second block, so you could perform cancellation just by swapping that block for a new one. The whole structure would be represented by Encapsulate()ing the first and the third block.

    The issues with this approach is that the cancellation has a delay of 1 item (the one that's in the third block). Also, I didn't figure out a good interface for this.

Foreland answered 26/2, 2014 at 14:19 Comment(1)
I'm awarding the +50 here for the effort on the example. Thanks.Digitigrade

© 2022 - 2024 — McMap. All rights reserved.