tpl dataflow: fixed buffer size without throwing items away
Asked Answered
I

1

9

After playing around with dataflow I encountered a new problem. I would like to limit the inputqueue of all blocks. My producingblock (ActionBlock) is creating 5000 elements really fast and posts them to an broadcastblock. So if i set the BoundedCapacity of the broadcastblock to 100 he throws a lot of data away. But I would prefer the producingblock to wait for new slots in the inputqueue of my bufferblock.

Is there any way to get rid of this problem?

Inheritor answered 17/9, 2013 at 9:54 Comment(1)
Related: BroadcastBlock with Guaranteed Delivery in TPL DataflowEsperanto
L
8

That's exactly what BufferBlock is for. If you set its BoundedCapacity and it gets full, it will postpone receiving any messages until someone consumes them. This means that for example Post() will block and SendAsync() will return an unfinished Task.

EDIT: There is no built-in block that sends to multiple targets and never throws data away. But you can easily build one yourself from ActionBlock and sending loop:

static ITargetBlock<T> CreateMultipleTargetsBlock<T>(
    IEnumerable<ITargetBlock<T>> targets, int boundedCapacity)
{
    var targetsList = targets.ToList();

    var block = new ActionBlock<T>(
        async item =>
        {
            foreach (var target in targetsList)
            {
                await target.SendAsync(item);
            }
        },
        new ExecutionDataflowBlockOptions { BoundedCapacity = boundedCapacity });

    // TODO: propagate completion from block to targets

    return block;
}

This code assumes that you don't need to clone the data for each target and that the list of targets never changes. Modifying the code for that should be fairly simple.

Lochia answered 17/9, 2013 at 12:3 Comment(4)
First of all - thanks. I was absolutly blind ... . But I need a block, which can send messages to multiple recievers. Thats not possbile with a bufferblock. Any idea how to solve this?Inheritor
What should happen when one of the receivers is slow? Should the other receivers be required to wait for it? (I'm assuming that the receivers also have BoundedCapacity set, otherwise, setting BoundedCapacity on the buffer would basically have no effect.)Lochia
yes, BoundedCapacity is always to the same value. Its also true, that the other recievers need to wait for the slow operation.Inheritor
In my experience, Post() does not block when called on a BufferBlock filled to capacity.Stonewort

© 2022 - 2024 — McMap. All rights reserved.