Batching on duration or threshold using TPL Dataflow
Asked Answered
A

3

13

I have implemented a producer..consumer pattern using TPL Dataflow. The use case is that code reads messages from the Kafka bus. For efficiency, we need to process messages in batches when going to the database.

Is there a way in TPL data flow to hold on to the message and fire whenever a size or duration threshold is hit?

Example, the current implementation post the message once it is pulled from the queue.

    postedSuccessfully = targetBuffer.Post(msg.Value);
Alsatia answered 3/10, 2018 at 18:15 Comment(4)
Use BatchBlock. The BatchBlock will collect messages until a batch size is readched then emit a collection of messages for downstream processing.Allisan
Thanks. BatchBlock will collect messages. I also need to emit messages if a certain time threshold is reached. Is there an option to specify max messages or a timeout threshold?Alsatia
There's no out of the box timeout, but you could empty it with a timer. There is options for max groups, and capacity which might help your other requirements.Allisan
@AshishBhatia why not use Reactive Extensions instead? Buffer allows batching by count or timespan, eg mySequence.Buffer(TimeSpan.FromSeconds(1)).Amice
A
11

Buffering by count and duration is already available through the System.Reactive and specifically, the Buffer operator. Buffer collects incoming events until either the desired count is reached or its timespan expires.

Dataflow blocks are designed to work with System.Reactive. Blocks can be converted to Observables and Observers by using the DataflowBlock.AsObservable() and AsObserver() extension methods.

This makes building a buffering block very easy :

public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
    var inBlock = new BufferBlock<TIn>();
    var outBlock = new BufferBlock<IList<TIn>>();

    var outObserver=outBlock.AsObserver();
    inBlock.AsObservable()
            .Buffer(timeSpan, count)
            .ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(outObserver);

    return DataflowBlock.Encapsulate(inBlock, outBlock);

}

This method uses two buffer blocks to buffer inputs and outputs. Buffer() reads from the input block (the observable) and writes to the output block (the observer) when either the batch is full or the timespan expires.

By default, Rx works on the current thread. By calling ObserveOn(TaskPoolScheduler.Default) we tell it to process data on a Task pool thread.

Example

This code creates a buffer block for 5 items or 1 second. It starts by posting 7 items, waits 1.1 seconds then posts another 7 items. Each batch is written to the console together with the thread ID :

static async Task Main(string[] args)
{
    //Build the pipeline
    var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);

    var options = new DataflowLinkOptions { PropagateCompletion = true };
    var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
    bufferBlock.LinkTo(printBlock, options);

    //Start the messages
    Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");

    for (int i=0;i<7;i++)
    {
        bufferBlock.Post(i.ToString());
    }
    await Task.Delay(1100);
    for (int i=7; i < 14; i++)
    {
        bufferBlock.Post(i.ToString());
    }
    bufferBlock.Complete();
    Console.WriteLine($"Finishing");
    await bufferBlock.Completion;
    Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
    Console.ReadKey();
}

static void printOut(IEnumerable<string> items)
{
    var line = String.Join(",", items);
    Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}

The output is :

Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6
Amice answered 6/11, 2018 at 13:10 Comment(3)
Is it possible to wait the outBlock to be empty before writing more data?Dehumidifier
It's already available. You can specify a bound on any block through the BoundedCapacity option and use await block.SendAsync to post to it. SendAsync will wait asynchronously if the block is full. Post will return false in the same situation. If you set BoundedCapacity=1 the method will publish a new buffer only when the previous one is processedAmice
It's a great using of System.Reactive tbh, love the simplicity to build a missing feature (imo) in the TPL world.Commandant
A
4

While there is no out of the box timeout you can wire up a timer to TriggerBatch whenever the downstream pipeline has waited long enough for a batch. Then reset the timer when ever a batch is flowed through. The BatchBlock will take care of the rest for you.

Now for example, this sample has been configure to cause a batch size of 1 everytime even though the batch block would normally be waiting for 10 elements. The timeout forces emptying whatever is currently stored in the BatchBlock

public class BatchBlockExample
{
    [Test]
    public async Task BatchBlockWithTimeOut()
    {
        var batchBlock = new BatchBlock<int>(10);

        var timeOut = TimeSpan.FromSeconds(1);
        var timeOutTimer = new System.Timers.Timer(timeOut.TotalMilliseconds);
        timeOutTimer.Elapsed += (s, e) => batchBlock.TriggerBatch();            

        var actionBlock = new ActionBlock<IEnumerable<int>>(x =>
        {
            //Reset the timeout since we got a batch
            timeOutTimer.Stop();
            timeOutTimer.Start();
            Console.WriteLine($"Batch Size: {x.Count()}");
        });

        batchBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        timeOutTimer.Start();

        foreach(var item in Enumerable.Range(0, 5))
        {
            await Task.Delay(2000);
            await batchBlock.SendAsync(item);
        }

        batchBlock.Complete();
        await actionBlock.Completion;
    }
}

Output:

Batch Size: 1
Batch Size: 1
Batch Size: 1
Batch Size: 1
Batch Size: 1
Allisan answered 3/10, 2018 at 20:34 Comment(2)
Seems a good solution but can't run concurrently since every actionBlock process will stop and start the timer, you'll have an issue with the defined timeout.Commandant
@Commandant good call out, the AB would need to be limited to MDOP of 1 in this caseAllisan
F
3

I guess you could use something like this, Basically its just BatchBlock with a Timeout all rolled in to one

BatchBlockEx

public sealed class BatchBlockEx<T> : IDataflowBlock, IPropagatorBlock<T, T[]>, ISourceBlock<T[]>, ITargetBlock<T>, IReceivableSourceBlock<T[]>
{
   private readonly AsyncAutoResetEvent _asyncAutoResetEvent = new AsyncAutoResetEvent();

   private readonly BatchBlock<T> _base;

   private readonly CancellationToken _cancellationToken;

   private readonly int _triggerTimeMs;

   public BatchBlockEx(int batchSize, int triggerTimeMs)
   {
      _triggerTimeMs = triggerTimeMs;
      _base = new BatchBlock<T>(batchSize);
      PollReTrigger();
   }

   public BatchBlockEx(int batchSize, int triggerTimeMs, GroupingDataflowBlockOptions dataflowBlockOptions)
   {
      _triggerTimeMs = triggerTimeMs;
      _cancellationToken = dataflowBlockOptions.CancellationToken;
      _base = new BatchBlock<T>(batchSize, dataflowBlockOptions);
      PollReTrigger();
   }

   public int BatchSize => _base.BatchSize;

   public int OutputCount => _base.OutputCount;

   public Task Completion => _base.Completion;

   public void Complete() => _base.Complete();

   void IDataflowBlock.Fault(Exception exception) => ((IDataflowBlock)_base).Fault(exception);

   public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions) => _base.LinkTo(target, linkOptions);

   T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target, out bool messageConsumed) => ((ISourceBlock<T[]>)_base).ConsumeMessage(messageHeader, target, out messageConsumed);

   void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target) => ((ISourceBlock<T[]>)_base).ReleaseReservation(messageHeader, target);

   bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target) => ((ISourceBlock<T[]>)_base).ReserveMessage(messageHeader, target);

   DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
   {
      _asyncAutoResetEvent.Set();
      return ((ITargetBlock<T>)_base).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
   }

   public bool TryReceive(Predicate<T[]> filter, out T[] item) => _base.TryReceive(filter, out item);

   public bool TryReceiveAll(out IList<T[]> items) => _base.TryReceiveAll(out items);

   public override string ToString() => _base.ToString();

   public void TriggerBatch() => _base.TriggerBatch();

   private void PollReTrigger()
   {
      async Task Poll()
      {
         try
         {
            while (!_cancellationToken.IsCancellationRequested)
            {
               await _asyncAutoResetEvent.WaitAsync()
                                          .ConfigureAwait(false);

               await Task.Delay(_triggerTimeMs, _cancellationToken)
                           .ConfigureAwait(false); 
               TriggerBatch();
            }
         }
         catch (TaskCanceledException)
         {
            // nope
         }
      }

      Task.Run(Poll, _cancellationToken);
   }
}

AsyncAutoResetEvent

public class AsyncAutoResetEvent
{
   private static readonly Task _completed = Task.FromResult(true);
   private readonly Queue<TaskCompletionSource<bool>> _waits = new Queue<TaskCompletionSource<bool>>();
   private bool _signaled;

   public Task WaitAsync()
   {
      lock (_waits)
      {
         if (_signaled)
         {
            _signaled = false;
            return _completed;
         }

         var tcs = new TaskCompletionSource<bool>();
         _waits.Enqueue(tcs);
         return tcs.Task;
      }
   }

   public void Set()
   {
      TaskCompletionSource<bool> toRelease = null;

      lock (_waits)
         if (_waits.Count > 0)
            toRelease = _waits.Dequeue();
         else if (!_signaled)
            _signaled = true;

      toRelease?.SetResult(true);
   }
}
Fetishist answered 6/11, 2018 at 12:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.