Alternate to Dataflow BroadcastBlock with guaranteed delivery
Asked Answered
F

1

7

I need to have some kind of object that acts like a BroadcastBlock, but with guaranteed delivery. So i used an answer from this question. But i don't really clearly understand the execution flow here. I have a console app. Here is my code:

static void Main(string[] args)
{
    ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
    List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();

    for (int i = 0; i <= 10; i++)
        blocks.Add(new ActionBlock<int>(num => 
        {
            int coef = i;
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef); 
        }, execopt));

    ActionBlock<int> broadcaster = new ActionBlock<int>(async num => 
    {
        foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
    }, execopt);

    broadcaster.Completion.ContinueWith(task =>
        {
            foreach (ActionBlock<int> block in blocks) block.Complete();
        });

    Task producer = Produce(broadcaster);
    List<Task> ToWait = new List<Task>();
    foreach (ActionBlock<int> block in blocks) ToWait.Add(block.Completion);
    ToWait.Add(producer);

    Task.WaitAll(ToWait.ToArray());

    Console.ReadLine();
}

static async Task Produce(ActionBlock<int> broadcaster)
{
    for (int i = 0; i <= 15; i++) await broadcaster.SendAsync(i);

    broadcaster.Complete();
}

Each number must be handled sequentially, so i can't use MaxDegreeOfParallelism in broadcaster block. But all actionblocks that receive the number can run in parallel.

So here is the question:

In the output i can see different thread ids. Do i understand it correctly that works as follows:

Execution hits await block.SendAsync(num); in a broadcaster. If current block is not ready to accept the number, execution exits broadcaster and hangs at the Task.WaitAll. When block accepts the number, the rest of foreach statement in broadcaster is executed in a threadpool. And the same till the end. Each iteration of foreach is executed in a threadpool. But actually it happens sequentially.

Am i right or wrong in my understanding? How can i change this code to send the number to all blocks asynchronously?

To make sure that if one of blocks is not ready to receive the number at the moment, i won't wait for it and all others that are ready will receive the number. And that all blocks can run in parallel. And guarantee delivery.

Formenti answered 1/8, 2014 at 14:4 Comment(0)
R
7

Assuming you want to handle one item at a time by the broadcaster while enabling the target blocks to receive that item concurrently you need to change the broadcaster to offer the number to all blocks at the same time and then asynchronously wait for all of them together to accept it before moving on to the next number:

var broadcaster = new ActionBlock<int>(async num => 
{
    var tasks = new List<Task>();
    foreach (var block in blocks)
    {
        tasks.Add(block.SendAsync(num));
    }
    await Task.WhenAll(tasks);
}, execopt);

Now, in this case where you don't have work after the await you can slightly optimize while still returning an awaitable task:

ActionBlock<int> broadcaster = new ActionBlock<int>(
    num => Task.WhenAll(blocks.Select(block => block.SendAsync(num))), execopt);
Roasting answered 1/8, 2014 at 14:13 Comment(6)
Is it possible not to wait for all parsers to complete before moving to the next number? I mean that whenever some parser's buffer is available to receive, broadcaster sends to it. In this way i don't wait for the slowest. I guess that i made some wrong explanation in the question. I need every single parser to handle all the numbers in order they are received. But i don't need to wait for all the parsers to complete after each number.Formenti
@ПавелБирюков When you call SendAsync and there's room in the target's buffer than the returned task is completed immediately. You would only be waiting when one of the buffers is full. You can increase that buffer but I wouldn't continue on without making sure the item moved to the next block.Roasting
But can i not wait when one of the buffers is full, but proceed with the next number to empty buffers?Formenti
You can, but that could lead to a memory leak. You need to store the extra numbers to be able to send them to the full block later, which is like creating another buffer that you can't control. If memory isn't an issue you can simply increase the blocks' bounded capacity (or even make it boundless), but that could fill up your RAM.Roasting
How "guaranteed" are we going to get here? Everything in Dataflow is in memory. If there is any chance of a buffer overflow on a critical task, shouldn't we be using a more durable message queue? If not, or if it isn't critical, doesn't the buffering in the execution block types suffice? The mechanism outlined in this answer seems like too much/too little.Frostwork
@MarcL. TPL Dataflow is an in memory library. If you need the data to survive a crash you of course can't rely on it and you should use a persistent queuing mechanism. As an aside, since .NET is managed, as long as you're not using unsafe code you can't get a buffer overflow (you can however crash on a stack overflow).Roasting

© 2022 - 2024 — McMap. All rights reserved.