Unexpected Behaviour - TPL DataFlow BatchBlock Rejects items while TriggerBatch is executing
Asked Answered
D

1

10

When you create a batchblock with bounded capacity and call triggerBatch while (In parallel to) posting a new item - posting new item will fail during the trigger batch execution time.

Calling Trigger batch (every X time) is made in order to ensure that the data isn't delayed for too long in the block, in cases where the incoming data stream paused or slowed down.

The following code will output some "post failure" events. For example:

    public static void Main(string[] args)
    {
        var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions() { BoundedCapacity = 10000000 });
        var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
        batchBlock.LinkTo(actionBlock);

        var producerTask = Task.Factory.StartNew(() =>
        {
            //Post 10K Items
            for (int i = 0; i < 10000; i++)
            {
                var postResult = batchBlock.Post(i);
                if (!postResult)
                    Console.WriteLine("Failed to Post");
            }
        });

        var triggerBatchTask = Task.Factory.StartNew(() =>
            {                    
                //Trigger Batch..
                for (int i = 0; i < 1000000; i++)
                    batchBlock.TriggerBatch();
            });

        producerTask.Wait();
        triggerBatchTask.Wait();
    }

    public static void ProcessBatch(int[] batch)
    {
        Console.WriteLine("{0} - {1}", batch.First(), batch.Last());
    }

*Note that this scenario is reproducible only when the batchBlock is Bounded.

Am I missing something or is it an issue with batchBlock?

Drub answered 25/2, 2016 at 12:12 Comment(1)
C
5

The BatchBlock does not really reject the item, it attempts to postpone it. Except that in the case of Post(), postponing is not an option. A simple way to fix this would be to use await batchBlock.SendAsync(i) instead of batchBlock.Post(i) (this also means you need to change Task.Factory.StartNew(() => to Task.Run(async () =>).

Why does this happen? According to the source code, if the BatchBlock is bounded, TriggerBatch() is processed asynchronously and while it's processed, no new items are being accepted.

In any case, you shouldn't expect that Post() will always return true on a bounded block, if the block is full, Post() will also return false.

Cardigan answered 25/2, 2016 at 17:38 Comment(5)
Meanwhile i'm using a different solution, by introducing another block that will accept the failures, and eventually i'm calling triggerbatch in a serial manner on both blocks. To your suggest solution - await and async will create a task to handle each incoming item, this might cause out of memory issues when you have a huge burst of events many tasks will be created unbounded.Drub
@AlYaros No, it won't. If the item is accepted, you get a cached Task, so no allocations there. And if the item is postponed, the code you showed won't add new items until it's accepted. If in your actual code await would cause issues, then IMO either you should be able to fix them, or you're going to get issues even without it.Cardigan
By the way, thanks for your comments :) I'm not sure that this is absolutely safe in terms of tasks memory consumptions.. I'll take a look on the source code you suggested and test it a bit. Isn't the task created before the block code is executed, regardless to the post result?Drub
I would also expect that the Post will be accepted, if the bounded capacity wasn't reached, regardless the TriggerBatch processing.Boff
@Cardigan from the source code TriggerBatch and OfferMessage method execute under lock(IncomingLock). other than that what else make preventing Post() to fail when TriggerBatch() is executing. I ran some test and see that If batchTrigger timeout as 1000ms 'Post()' return false very less times compared to if timeout is some lower value like <100msShrive

© 2022 - 2024 — McMap. All rights reserved.