Here is a policed version of Drew Marsh's idea. This implementation uses the DataflowBlock.Encapsulate
method to create a dataflow block that encapsulates the timer+batch functionality. Beyond the new argument timeout
, the CreateBatchBlock
method also supports all options available to the normal BatchBlock
constructor.
public static IPropagatorBlock<T, T[]> CreateBatchBlock<T>(int batchSize,
int timeout, GroupingDataflowBlockOptions dataflowBlockOptions = null)
{
dataflowBlockOptions = dataflowBlockOptions ?? new GroupingDataflowBlockOptions();
var batchBlock = new BatchBlock<T>(batchSize, dataflowBlockOptions);
var timer = new System.Threading.Timer(_ => batchBlock.TriggerBatch());
var transformBlock = new TransformBlock<T, T>((T value) =>
{
timer.Change(timeout, Timeout.Infinite);
return value;
}, new ExecutionDataflowBlockOptions()
{
BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
CancellationToken = dataflowBlockOptions.CancellationToken,
EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
NameFormat = dataflowBlockOptions.NameFormat,
TaskScheduler = dataflowBlockOptions.TaskScheduler
});
transformBlock.LinkTo(batchBlock, new DataflowLinkOptions()
{
PropagateCompletion = true
});
return DataflowBlock.Encapsulate(transformBlock, batchBlock);
}
Caution: As pointed by @Jeff in a comment, there is a race condition with this approach. In case the timeout
is very small (in the range of milliseconds), the transformBlock
will be racing with the timer
to pass the data onto the batchBlock
, and the timer
may fire first before the batchBlock
has anything in it yet. Worst case scenario, we hang indefinitely. No more messages end up in the queue because they are waiting on a handful of previous ones to complete, but there is one straggler sitting in the latest buffer that will never trigger.
Alternative: below is a BatchUntilInactiveBlock<T>
class that offers the whole range of the BatchBlock<T>
functionality. This implementation is a thin wrapper around a BatchBlock<T>
instance. It has less overhead than the previous CreateBatchBlock
implementation, while having a similar behavior. It's not affected by the race condition mentioned earlier.
/// <summary>
/// Provides a dataflow block that batches inputs into arrays.
/// A batch is produced when the number of currently queued items becomes equal
/// to BatchSize, or when a Timeout period has elapsed after receiving the last item.
/// </summary>
public class BatchUntilInactiveBlock<T> : IPropagatorBlock<T, T[]>,
IReceivableSourceBlock<T[]>
{
private readonly BatchBlock<T> _source;
private readonly Timer _timer;
private readonly TimeSpan _timeout;
public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout,
GroupingDataflowBlockOptions dataflowBlockOptions)
{
_source = new BatchBlock<T>(batchSize, dataflowBlockOptions);
_timer = new Timer(_ => _source.TriggerBatch());
_timeout = timeout;
}
public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout) : this(batchSize,
timeout, new GroupingDataflowBlockOptions())
{ }
public int BatchSize => _source.BatchSize;
public TimeSpan Timeout => _timeout;
public Task Completion => _source.Completion;
public int OutputCount => _source.OutputCount;
public void Complete() => _source.Complete();
void IDataflowBlock.Fault(Exception exception)
=> ((IDataflowBlock)_source).Fault(exception);
public IDisposable LinkTo(ITargetBlock<T[]> target,
DataflowLinkOptions linkOptions)
=> _source.LinkTo(target, linkOptions);
public void TriggerBatch() => _source.TriggerBatch();
public bool TryReceive(Predicate<T[]> filter, out T[] item)
=> _source.TryReceive(filter, out item);
public bool TryReceiveAll(out IList<T[]> items)
=> _source.TryReceiveAll(out items);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(
DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
bool consumeToAccept)
{
var offerResult = ((ITargetBlock<T>)_source).OfferMessage(messageHeader,
messageValue, source, consumeToAccept);
if (offerResult == DataflowMessageStatus.Accepted)
_timer.Change(_timeout, System.Threading.Timeout.InfiniteTimeSpan);
return offerResult;
}
T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target, out bool messageConsumed)
=> ((ISourceBlock<T[]>)_source).ConsumeMessage(messageHeader,
target, out messageConsumed);
bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
=> ((ISourceBlock<T[]>)_source).ReserveMessage(messageHeader, target);
void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
=> ((ISourceBlock<T[]>)_source).ReleaseReservation(messageHeader, target);
}
The _timer
is scheduled immediately after the BatchBlock<T>
has been offered and has accepted a message. There is no window of time between scheduling the timer and offering the message, so there is no race.
Disclaimer: The behavior of the above implementations is not ideal, in that they produce short batches even in cases that they shouldn't. The ideal behavior would be to produce a short batch only in case the batch can be propagated instantly to a consumer downstream. Producing short batches and just storing them in the block's output buffer doesn't make much sense. This deviation from the ideal behavior can only be observed if the CreateBatchBlock<T>
/BatchUntilInactiveBlock<T>
is not rigorously pumped, for example if the linked block downstream is bounded and has reached its maximum capacity.