How to call TriggerBatch automagically after a timeout if the number of queued items is less than the BatchSize?
Asked Answered
W

5

22

Using Dataflow CTP (in the TPL)

Is there a way to call BatchBlock.TriggerBatch automatically if the number of currently queued or postponed items is less than the BatchSize, after a timeout?

And better: this timeout should be reset to 0 each time the block receives a new item.

Windtight answered 23/2, 2012 at 19:1 Comment(0)
L
29

Yes, you can accomplish this rather elegantly by chaining together blocks. In this case you want to setup a TransformBlock which you link "before" the BatchBlock. That would look something like this:

Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch());

TransformBlock<T, T> timeoutTransformBlock = new TransformBlock<T, T>((value) =>
{
    triggerBatchTimer.Change(5000, Timeout.Infinite);

    return value; 
});

timeoutTransformBlock.LinkTo(yourBatchBlock);

yourBufferBlock.LinkTo(timeoutTransformBlock);
Loudish answered 24/2, 2012 at 0:52 Comment(0)
H
8

Here is a policed version of Drew Marsh's idea. This implementation uses the DataflowBlock.Encapsulate method to create a dataflow block that encapsulates the timer+batch functionality. Beyond the new argument timeout, the CreateBatchBlock method also supports all options available to the normal BatchBlock constructor.

public static IPropagatorBlock<T, T[]> CreateBatchBlock<T>(int batchSize,
    int timeout, GroupingDataflowBlockOptions dataflowBlockOptions = null)
{
    dataflowBlockOptions = dataflowBlockOptions ?? new GroupingDataflowBlockOptions();
    var batchBlock = new BatchBlock<T>(batchSize, dataflowBlockOptions);
    var timer = new System.Threading.Timer(_ => batchBlock.TriggerBatch());
    var transformBlock = new TransformBlock<T, T>((T value) =>
    {
        timer.Change(timeout, Timeout.Infinite);
        return value;
    }, new ExecutionDataflowBlockOptions()
    {
        BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
        CancellationToken = dataflowBlockOptions.CancellationToken,
        EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
        MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
        NameFormat = dataflowBlockOptions.NameFormat,
        TaskScheduler = dataflowBlockOptions.TaskScheduler
    });
    transformBlock.LinkTo(batchBlock, new DataflowLinkOptions()
    {
        PropagateCompletion = true
    });
    return DataflowBlock.Encapsulate(transformBlock, batchBlock);
}

Caution: As pointed by @Jeff in a comment, there is a race condition with this approach. In case the timeout is very small (in the range of milliseconds), the transformBlock will be racing with the timer to pass the data onto the batchBlock, and the timer may fire first before the batchBlock has anything in it yet. Worst case scenario, we hang indefinitely. No more messages end up in the queue because they are waiting on a handful of previous ones to complete, but there is one straggler sitting in the latest buffer that will never trigger.


Alternative: below is a BatchUntilInactiveBlock<T> class that offers the whole range of the BatchBlock<T> functionality. This implementation is a thin wrapper around a BatchBlock<T> instance. It has less overhead than the previous CreateBatchBlock implementation, while having a similar behavior. It's not affected by the race condition mentioned earlier.

/// <summary>
/// Provides a dataflow block that batches inputs into arrays.
/// A batch is produced when the number of currently queued items becomes equal
/// to BatchSize, or when a Timeout period has elapsed after receiving the last item.
/// </summary>
public class BatchUntilInactiveBlock<T> : IPropagatorBlock<T, T[]>,
    IReceivableSourceBlock<T[]>
{
    private readonly BatchBlock<T> _source;
    private readonly Timer _timer;
    private readonly TimeSpan _timeout;

    public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout,
        GroupingDataflowBlockOptions dataflowBlockOptions)
    {
        _source = new BatchBlock<T>(batchSize, dataflowBlockOptions);
        _timer = new Timer(_ => _source.TriggerBatch());
        _timeout = timeout;
    }

    public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout) : this(batchSize,
        timeout, new GroupingDataflowBlockOptions())
    { }

    public int BatchSize => _source.BatchSize;
    public TimeSpan Timeout => _timeout;
    public Task Completion => _source.Completion;
    public int OutputCount => _source.OutputCount;

    public void Complete() => _source.Complete();

    void IDataflowBlock.Fault(Exception exception)
        => ((IDataflowBlock)_source).Fault(exception);

    public IDisposable LinkTo(ITargetBlock<T[]> target,
        DataflowLinkOptions linkOptions)
            => _source.LinkTo(target, linkOptions);

    public void TriggerBatch() => _source.TriggerBatch();

    public bool TryReceive(Predicate<T[]> filter, out T[] item)
        => _source.TryReceive(filter, out item);

    public bool TryReceiveAll(out IList<T[]> items)
        => _source.TryReceiveAll(out items);

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(
        DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
        bool consumeToAccept)
    {
        var offerResult = ((ITargetBlock<T>)_source).OfferMessage(messageHeader,
            messageValue, source, consumeToAccept);
        if (offerResult == DataflowMessageStatus.Accepted)
            _timer.Change(_timeout, System.Threading.Timeout.InfiniteTimeSpan);
        return offerResult;
    }

    T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target, out bool messageConsumed)
            => ((ISourceBlock<T[]>)_source).ConsumeMessage(messageHeader,
                target, out messageConsumed);

    bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
            => ((ISourceBlock<T[]>)_source).ReserveMessage(messageHeader, target);

    void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
            => ((ISourceBlock<T[]>)_source).ReleaseReservation(messageHeader, target);
}

The _timer is scheduled immediately after the BatchBlock<T> has been offered and has accepted a message. There is no window of time between scheduling the timer and offering the message, so there is no race.


Disclaimer: The behavior of the above implementations is not ideal, in that they produce short batches even in cases that they shouldn't. The ideal behavior would be to produce a short batch only in case the batch can be propagated instantly to a consumer downstream. Producing short batches and just storing them in the block's output buffer doesn't make much sense. This deviation from the ideal behavior can only be observed if the CreateBatchBlock<T>/BatchUntilInactiveBlock<T> is not rigorously pumped, for example if the linked block downstream is bounded and has reached its maximum capacity.

Hyoscyamine answered 4/12, 2019 at 9:2 Comment(13)
Amazing of you to come and add the alternative implementation 18 months later! I'm guessing this means you have probably been using this yourself for a while... do you also happen to have already written one that uses the "release messages after at most x time" semantics, rather than the "reset if more messages arrive within x time" semantics? I'm not confident I understand the internal plumbing of the interfaces well enough to dare try it myself with your wrapper approach rather than the encapsulate approach (which is what I am using now).Recriminate
@Recriminate actually someone upvoted my answer, so I gave it a fresh look searching for improvements. :-) Regarding a custom BatchBlock that resets its timer when the first item in a batch is received, I found this question: TPL DataFlow - Batching on duration or threshold. It has some answers by knowledgeable StackOverflow members. I may try to improve on these answers when I have time.Hyoscyamine
Haha yes, that was me. I have indeed seen that thread, but the completely self contained, "dataflow-pure" solutions you have here are the best on the net (MHO). The encapsulation approach works nicely and I am entirely happy with it, whatever additional overhead it has is not even close to being a bottleneck for my particular use. In fact aesthetically I still prefer it, the lego approach is the natural fit.Recriminate
@Recriminate yes, the DataflowBlock.Encapsulate approach is convenient and usually much sorter than implementing the IPropagatorBlock<TInput,TOutput> interface directly. On the other hand including additional functionality, beyond what the IPropagatorBlock interface offers, becomes very awkward (you must use out Action parameters or something). Also having the IReceivableSourceBlock<TOutput> interface available is handy sometimes. For example it allows to convert a dataflow block to an IAsyncEnumerable<TOutput> sequence, as shown here.Hyoscyamine
@Recriminate I changed the name of the class to BatchUntilInactiveBlock<T>, because the original name (TimeoutBatchBlock<T>) is more suitable IMHO for the other behavior (activating the timer when the first item arrives, not the last).Hyoscyamine
I am pleased with my decision not to attempt that myself. Yikes! I knew there'd be something tricky ;)Recriminate
@Recriminate yeap, the TPL Dataflow is not easily extensible. It needs a lot of code to do simple stuff. That's the main weakness of this library IMHO.Hyoscyamine
Hi @allmhuran. A correction. As I learned today, the DataflowBlock.Encapsulate returns an IPropagatorBlock<TInput,TOutput> implementation that also implements the IReceivableSourceBlock<TOutput> interface (source code). It's not obvious, but if you cast it ((IReceivableSourceBlock<TOutput>)encapsulatedBlock) the cast will succeed. This makes the DataflowBlock.Encapsulate approach a bit more appealing, since it eliminates one of its presumed disadvantages.Hyoscyamine
Very admirable of you to come back and keep improving the information on this (and other) topics! This is the only time I have ever looked at using Encapsulate for anything and so having not really explored it thoroughly at all I absolutely did not realise that. Useful for sure.Recriminate
Won't the "easier" approach be susceptible to a race condition where a batch never gets processed after timeout? The TransformBlock will be racing with the timer to pass the data onto the BufferBlock and the Timer may ellapse first but the BufferBlock will have nothing in it yet.Exude
@Exude you mean if the timeout is very small, in the range of milliseconds? Yea, there might be a race condition in this case. Worst case scenario: the timeout policy will be violated. Eventually all messages will be processed though, when the block completes. My opinion is, for timeout greater than one second, both implementations are fine. For timeout less than one second, prefer the second implementation to be on the safe side.Hyoscyamine
worst case actually, we hang indefinitely... no more messages end up in the queue because they are waiting on a handful of previous ones to complete, but there is one straggler sitting in the latest buffer that will never trigger.Exude
but yes, in the range of 30 milliseconds - the scenario is an automatic (read middleware) HTTP request batching implementation for MS Graph API calls that uses their $batch endpoint to batch HTTP requests. A delegating handler adds to the buffer block and then batches any messages (up to 15 or within 50ms of adding to the block) to the $batch endpoint, then returns the HTTP Responses to the original delegating handlers. I never want to incur more than 30 milliseconds of performance hit to the request to reap the benefit of batching. The second implementation is safe because it offers firstExude
D
4

Thanks to Drew Marsh for the idea of using a TransformBlock which greatly helped me with a recent solution. However, I believe that the timer needs to be reset AFTER the batch block (i.e. after it has either been triggered by the batch size being reached OR the TriggerBatch method being explicitly called within the timer callback). If you reset the timer every time you get a single item then it can potentially keep resetting several times without actually triggering a batch at all (constantly pushing the "dueTime" on the Timer further away).

This would make the code snippet look like the following:

Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch(), null, 5000, Timeout.Infinite);

TransformBlock<T[], T[]> timeoutTransformBlock = new TransformBlock<T[], T[]>((value) =>
{
    triggerBatchTimer.Change(5000, Timeout.Infinite);

    return value; 
});

yourBufferBlock.LinkTo(yourBatchBlock);
yourBatchBlock.LinkTo(timeoutTransformBlock)
timeoutTransformBlock.LinkTo(yourActionBlock);

// Start the producer which is populating the BufferBlock etc.
Dramatist answered 6/12, 2019 at 16:47 Comment(2)
Both timing approaches make sense for different scenarios.Hyoscyamine
I think this approach on its own has a flaw: 1. Post n<batchSize work items, the timer will trigger processing of the block; 2. The timer will be triggered once more after the batch is processed; 3. If a work item is posted after the timer in Step 2 has elapsed, it will sit waiting until there are n>=batchSize work items to process.Gigantopithecus
P
3

Here is a solution which builds upon previous answers. This method encapsulates an exisiting BatchBlock with one the pushes out batches at least as often as the timeout.

The other answers doesn't handle if there are no items is the batch block when the timer is triggered. In that case the other solutions waits until the batch is full. We had that issue in our non-production environments which made testing harder. This solution make sure that after an item is posted to the BatchBlock, it is propagated after at most timeout seconds.

public static IPropagatorBlock<T, T[]> CreateTimeoutBatchBlock<T>(BatchBlock<T> batchBlock, int timeout)
{
    var timespan = TimeSpan.FromSeconds(timeout);
    var timer = new Timer(
        _ => batchBlock.TriggerBatch(),
        null,
        timespan,
        timespan);
    var transformBlock = new TransformBlock<T[], T[]>(
        value =>
        {
            // Reset the timer when a batch has been triggered
            timer.Change(timespan, timespan);
            return value;
        });
    batchBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
    return DataflowBlock.Encapsulate(batchBlock, transformBlock);
}
Plaice answered 1/9, 2023 at 12:38 Comment(2)
This is indeed safe from race conditions, but it has the downside that the timer is firing repeatedly, even when the pipeline is idle. To be honest I have started questioning if the TPL Dataflow is a robust tool for this kind of work (or any kind of work for that matter). Its components are too loosely connected with each other.Hyoscyamine
I think the timer firing repeatedly is a small downside, since it basically does nothing if the queue is empty. But I agree with your other point, TPL has worked best for me when the workflows have fit into existing constructs.Plaice
B
-1

You can use link options

_transformManyBlock.LinkTo(_batchBlock, new DataflowLinkOptions {PropagateCompletion = true});
Bartolommeo answered 4/12, 2019 at 7:21 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.