I have an asynchronous sequence (stream) of messages that are arriving sometimes numerously and sometimes sporadically, and I would like to process them in batches of 10 messages per batch. I also want to enforce an upper limit to the latency between receiving a message and processing it, so a batch with fewer than 10 messages should also be processed, if 5 seconds have passed after receiving the first message of the batch. I found that I can solve the first part of the problem by using the Buffer
operator from the System.Interactive.Async package:
IAsyncEnumerable<Message> source = GetStreamOfMessages();
IAsyncEnumerable<IList<Message>> batches = source.Buffer(10);
await foreach (IList<Message> batch in batches)
{
// Process batch
}
The signature of the Buffer
operator:
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, int count);
Unfortunately the Buffer
operator has no overload with a TimeSpan
parameter, so I can't solve the second part of the problem so easily. I'll have to implement somehow a batching operator with a timer myself. My question is: how can I implement a variant of the Buffer
operator that has the signature below?
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count);
The timeSpan
parameter should affect the behavior of the Buffer
operator like so:
- A batch must be emitted when the
timeSpan
has elapsed after emitting the previous batch (or initially after the invocation of theBuffer
method). - An empty batch must be emitted if the
timeSpan
has elapsed after emitting the previous batch, and no messages have been received during this time. - Emitting batches more frequently than every
timeSpan
implies that the batches are full. Emitting a batch with less thancount
messages before thetimeSpan
has elapsed, is not desirable.
I am OK with adding external dependencies to my project if needed, like the System.Interactive.Async or the System.Linq.Async packages.
P.S. this question was inspired by a recent question related to channels and memory leaks.
System.Reactive
. TheBuffer
method with aTimeSpan
parameter can be found inSystem.Reactive
, notSystem.Interactive
. – BaratheaBuffer
operator forIAsyncObservable<T>
s notIAsyncEnumerable<T>
s. If you think that the non released AsyncRx.NET library has a solution for the problem presented in this question, feel free to post it as an answer. If it's a good answer I will upvoted it, and I'll even accept it when the package is released (assuming that it will be released eventually). – Mcclean