How to batch an IAsyncEnumerable<T>, enforcing a maximum interval policy between consecutive batches?
Asked Answered
M

2

0

I have an asynchronous sequence (stream) of messages that are arriving sometimes numerously and sometimes sporadically, and I would like to process them in batches of 10 messages per batch. I also want to enforce an upper limit to the latency between receiving a message and processing it, so a batch with fewer than 10 messages should also be processed, if 5 seconds have passed after receiving the first message of the batch. I found that I can solve the first part of the problem by using the Buffer operator from the System.Interactive.Async package:

IAsyncEnumerable<Message> source = GetStreamOfMessages();
IAsyncEnumerable<IList<Message>> batches = source.Buffer(10);
await foreach (IList<Message> batch in batches)
{
    // Process batch
}

The signature of the Buffer operator:

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, int count);

Unfortunately the Buffer operator has no overload with a TimeSpan parameter, so I can't solve the second part of the problem so easily. I'll have to implement somehow a batching operator with a timer myself. My question is: how can I implement a variant of the Buffer operator that has the signature below?

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count);

The timeSpan parameter should affect the behavior of the Buffer operator like so:

  1. A batch must be emitted when the timeSpan has elapsed after emitting the previous batch (or initially after the invocation of the Buffer method).
  2. An empty batch must be emitted if the timeSpan has elapsed after emitting the previous batch, and no messages have been received during this time.
  3. Emitting batches more frequently than every timeSpan implies that the batches are full. Emitting a batch with less than count messages before the timeSpan has elapsed, is not desirable.

I am OK with adding external dependencies to my project if needed, like the System.Interactive.Async or the System.Linq.Async packages.

P.S. this question was inspired by a recent question related to channels and memory leaks.

Mcclean answered 23/5, 2021 at 15:55 Comment(5)
The library that handles time is System.Reactive. The Buffer method with a TimeSpan parameter can be found in System.Reactive, not System.Interactive.Barathea
Besides, AsyncRX.NET, which provides Reactive operators over async streams, already has a Buffer operator. Cmbining Reactive and async streams isn't trivial though, which is why it's still in previewBarathea
@Panagiotis this question is about asynchronous sequences, not observable sequences. If you think that the functionality available in the System.Reactive package can solve this problem, feel free to post it as an answer.Mcclean
And the library for this is AsyncRx.NET, not System.Reactive. I pointed a link to the very source that provides timespa and count buffering over IAsyncEnumerable.Barathea
@Panagiotis the link you provided contains a Buffer operator for IAsyncObservable<T>s not IAsyncEnumerable<T>s. If you think that the non released AsyncRx.NET library has a solution for the problem presented in this question, feel free to post it as an answer. If it's a good answer I will upvoted it, and I'll even accept it when the package is released (assuming that it will be released eventually).Mcclean
M
0

The solution below uses the PeriodicTimer class (.NET 6) for receiving timer notifications, and the Task.WhenAny method for coordinating the timer and enumeration tasks. The PeriodicTimer class is more convenient than the Task.Delay method for this purpose, because it can be disposed directly, instead of requiring an accompanying CancellationTokenSource.

/// <summary>
/// Splits the elements of a sequence into chunks that are emitted when either
/// they are full, or a given amount of time has elapsed after requesting the
/// previous chunk.
/// </summary>
public static async IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);
    if (timeSpan < TimeSpan.FromMilliseconds(1.0))
        throw new ArgumentOutOfRangeException(nameof(timeSpan));
    if (count < 1) throw new ArgumentOutOfRangeException(nameof(count));

    using CancellationTokenSource linkedCts = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken);
    PeriodicTimer timer = null;
    Task<bool> StartTimer()
    {
        timer = new(timeSpan);
        return timer.WaitForNextTickAsync().AsTask();
    }
    IAsyncEnumerator<TSource> enumerator = source
        .GetAsyncEnumerator(linkedCts.Token);
    Task<bool> moveNext = null;
    try
    {
        List<TSource> buffer = new();
        TSource[] ConsumeBuffer()
        {
            timer?.Dispose();
            TSource[] array = buffer.ToArray();
            buffer.Clear();
            if (buffer.Capacity > count) buffer.Capacity = count;
            return array;
        }
        Task<bool> timerTickTask = StartTimer();
        while (true)
        {
            if (moveNext is null)
            {
                if (timerTickTask.IsCompleted)
                {
                    Debug.Assert(timerTickTask.Result);
                    yield return ConsumeBuffer();
                    timerTickTask = StartTimer();
                }
                moveNext = enumerator.MoveNextAsync().AsTask();
            }
            if (!moveNext.IsCompleted)
            {
                Task completedTask = await Task.WhenAny(moveNext, timerTickTask)
                    .ConfigureAwait(false);
                if (ReferenceEquals(completedTask, timerTickTask))
                {
                    Debug.Assert(timerTickTask.IsCompleted);
                    Debug.Assert(timerTickTask.Result);
                    yield return ConsumeBuffer();
                    timerTickTask = StartTimer();
                    continue;
                }
            }
            Debug.Assert(moveNext.IsCompleted);
            bool moved = await moveNext.ConfigureAwait(false);
            moveNext = null;
            if (!moved) break;
            TSource item = enumerator.Current;
            buffer.Add(item);
            if (buffer.Count == count)
            {
                yield return ConsumeBuffer();
                timerTickTask = StartTimer();
            }
        }
        if (buffer.Count > 0) yield return ConsumeBuffer();
    }
    finally
    {
        // Cancel the enumerator, for more responsive completion.
        try { linkedCts.Cancel(); }
        finally
        {
            // The last moveNext must be completed before disposing.
            if (moveNext is not null && !moveNext.IsCompleted)
                await Task.WhenAny(moveNext).ConfigureAwait(false);
            await enumerator.DisposeAsync().ConfigureAwait(false);
            timer?.Dispose();
        }
    }
}

The timer is restarted each time a chunk is emitted, after the consumer has finished consuming the chunk.

Online demo.

This implementation is destructive, meaning that in case the source sequence fails or the enumeration is canceled, any elements that have been consumed previously from the source and are buffered, will be lost. See this question for ideas about how to inject a non-destructive behavior.

Care has been taken to avoid leaking fire-and-forget MoveNextAsync operations or timers.

For an implementation that uses the Task.Delay method instead of the PeriodicTimer class, and so it can be used by .NET versions previous than 6.0, you can look at the 7th revision of this answer. That revision includes also a tempting but flawed Rx-based implementation.

Mcclean answered 24/5, 2021 at 17:26 Comment(4)
This leaks timers, a limited resource. Unlike the supposed leaks you mentioned, this is an actual leak. The time behavior is untestable, which makes using this code very hard to use in any except the simplest cases. Event stream processing is complex, so the ability to test is paramount. As for the supposed problems with Rx or AsyncRx - events come whether you want them or not, AsyncRx is the library you should check, not Rx.Barathea
And then there are the leaked Tasks created by AsTask(). That's a real leak, not something caused by misuse. Event and async streams are typically long-running so those tasks add up and put pressure to the GC. That's why the authors of System.Linq.Async, AsyncRx.NET, Rx.NET and even the BCL itself go to great lengths to mitigate thisBarathea
@Panagiotis as for the tasks created by AsTask(), why do you think that they are leaked? Every single one of them is awaited. The only case that one task can be leaked is if the consumer of the resulting sequence abandons the enumeration without awaiting the last MoveNextAsync operation. Regarding the suitability of the AsyncRX.NET for solving this problem, I've already prompted you to post the solution as an answer. I don't know how to do it myself, since the library is not released, so please do.Mcclean
You haven't asked a question. You're using SO to post an article. At best, you're asking for a review of the "answer". I didn't say tasks are leaked, they're created and need to be GCd though. .NET Core has gone to great lengths to avoid this. As for timers - there's a reason David Fowler warns against using Task.Delay without cancellation. Again, async streams are long-lived and those orphaned items add upBarathea
M
1

What about using a Channel to achieve the required functionality? Is there any flaw if using something like this extension method to read from a queue until a timeout has expired?

public static async Task<List<T>> ReadWithTimeoutAsync<T>(this ChannelReader<T> reader, TimeSpan readTOut, CancellationToken cancellationToken)
{
    var timeoutTokenSrc = new CancellationTokenSource();
    timeoutTokenSrc.CancelAfter(readTOut);

    var messages = new List<T>();

    using (CancellationTokenSource linkedCts =
        CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSrc.Token, cancellationToken))
    {
        try
        {
            await foreach (var item in reader.ReadAllAsync(linkedCts.Token))
            {
                messages.Add(item);
                linkedCts.Token.ThrowIfCancellationRequested();
            }

            Console.WriteLine("All messages read.");
        }
        catch (OperationCanceledException)
        {
            if (timeoutTokenSrc.Token.IsCancellationRequested)
            {
                Console.WriteLine($"Delay ({readTOut.Milliseconds} msec) for reading items from message channel has expired.");
            }
            else if (cancellationToken.IsCancellationRequested)
            {
                Console.WriteLine("Cancelling per user request.");
                cancellationToken.ThrowIfCancellationRequested();
            }
        }
    }
    timeoutTokenSrc.Dispose();

    return messages;
}

To combine the timeout with the max. batch size, one more token source could be added:

public static async Task<List<T>> ReadBatchWithTimeoutAsync<T>(this ChannelReader<T> reader, int maxBatchSize, TimeSpan readTOut, CancellationToken cancellationToken)
{
    var timeoutTokenSrc = new CancellationTokenSource();
    timeoutTokenSrc.CancelAfter(readTOut);
    var maxSizeTokenSrc = new CancellationTokenSource();

    var messages = new List<T>();

    using (CancellationTokenSource linkedCts =
        CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSrc.Token, maxSizeTokenSrc.Token, cancellationToken))
    {
        try
        {
            await foreach (var item in reader.ReadAllAsync(linkedCts.Token))
            {
                messages.Add(item);
                if (messages.Count >= maxBatchSize)
                {
                    maxSizeTokenSrc.Cancel();
                }
                linkedCts.Token.ThrowIfCancellationRequested();
            }....
Matthieu answered 3/11, 2021 at 10:56 Comment(8)
Thanks Ricky for the answer, but I don't think that it addresses the question that I've asked. The question is about IAsyncEnumerable<T>s, not about ChannelReader<T>s. Even if converters between these two containers where readily available, the return value of your ReadWithTimeoutAsync implementation (Task<List<T>>) is not an enumerable type. I guess that someone could call the ReadWithTimeoutAsync in a loop, until some terminating condition was met (which condition exactly?), but this is too far from what the question asks. Btw your implementation lacks an int count parameter.Mcclean
The use-case would be a multi-producer single-consumer scenario. The consumer loops and cyclically reads from the queue, but not until the queue becomes empty (that will not happen as the producers are writing more or less continuously to the queue). The consumer writes cyclically the records read from the queue in some database. The focus of my answer is how to achieve the enforcement of the maximum interval policy when reading some continuously coming asynchronous data. Probably you vere thinking at some finite data-stream while in my case the data-stream is "infinitely" filled with new dataMatthieu
Regarding the details of your implementation, the timeoutTokenSrc is probably redundant. linkedCts.CancelAfter(readTOut) should do the job just as well. Also you may want to check out this question. The ReadAllAsync is implemented in a way that may catch you by surprise.Mcclean
Thanks for the clarification. I also noticed that there is some latency in breaking the ReadAllAsync iteration after cancelling the token source. In my case is not as important, as the timeout is approximate. I just want to save periodically the messages coming through the Channel into a database. I am practically building the batches in the loop task where the new data is extracted from the Channel.Matthieu
If you want to reduce the latency, you can add this line inside the await foreach loop, after adding the item in the messages list: linkedCts.Token.ThrowIfCancellationRequested();Mcclean
Thanks, that line is helping indeed. By the way, I edited my answer with the possibility to combine the timeout with the max. batch size.Matthieu
I suppose that by forcing the immediate cancellation it is still guaranteed that all the items from the Channel are read (in the next cycles) and no information is lost.Matthieu
You need to be careful where you put the ThrowIfCancellationRequested. If you put it in the wrong place, you may lose messages. Honestly I don't think that you need so many CancellationTokenSources. Not only they make the code more complex, but they also make it less efficient. Controlling the flow by throwing exceptions is expensive. Exceptions should be thrown, in general, only when something exceptional happens.Mcclean
M
0

The solution below uses the PeriodicTimer class (.NET 6) for receiving timer notifications, and the Task.WhenAny method for coordinating the timer and enumeration tasks. The PeriodicTimer class is more convenient than the Task.Delay method for this purpose, because it can be disposed directly, instead of requiring an accompanying CancellationTokenSource.

/// <summary>
/// Splits the elements of a sequence into chunks that are emitted when either
/// they are full, or a given amount of time has elapsed after requesting the
/// previous chunk.
/// </summary>
public static async IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);
    if (timeSpan < TimeSpan.FromMilliseconds(1.0))
        throw new ArgumentOutOfRangeException(nameof(timeSpan));
    if (count < 1) throw new ArgumentOutOfRangeException(nameof(count));

    using CancellationTokenSource linkedCts = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken);
    PeriodicTimer timer = null;
    Task<bool> StartTimer()
    {
        timer = new(timeSpan);
        return timer.WaitForNextTickAsync().AsTask();
    }
    IAsyncEnumerator<TSource> enumerator = source
        .GetAsyncEnumerator(linkedCts.Token);
    Task<bool> moveNext = null;
    try
    {
        List<TSource> buffer = new();
        TSource[] ConsumeBuffer()
        {
            timer?.Dispose();
            TSource[] array = buffer.ToArray();
            buffer.Clear();
            if (buffer.Capacity > count) buffer.Capacity = count;
            return array;
        }
        Task<bool> timerTickTask = StartTimer();
        while (true)
        {
            if (moveNext is null)
            {
                if (timerTickTask.IsCompleted)
                {
                    Debug.Assert(timerTickTask.Result);
                    yield return ConsumeBuffer();
                    timerTickTask = StartTimer();
                }
                moveNext = enumerator.MoveNextAsync().AsTask();
            }
            if (!moveNext.IsCompleted)
            {
                Task completedTask = await Task.WhenAny(moveNext, timerTickTask)
                    .ConfigureAwait(false);
                if (ReferenceEquals(completedTask, timerTickTask))
                {
                    Debug.Assert(timerTickTask.IsCompleted);
                    Debug.Assert(timerTickTask.Result);
                    yield return ConsumeBuffer();
                    timerTickTask = StartTimer();
                    continue;
                }
            }
            Debug.Assert(moveNext.IsCompleted);
            bool moved = await moveNext.ConfigureAwait(false);
            moveNext = null;
            if (!moved) break;
            TSource item = enumerator.Current;
            buffer.Add(item);
            if (buffer.Count == count)
            {
                yield return ConsumeBuffer();
                timerTickTask = StartTimer();
            }
        }
        if (buffer.Count > 0) yield return ConsumeBuffer();
    }
    finally
    {
        // Cancel the enumerator, for more responsive completion.
        try { linkedCts.Cancel(); }
        finally
        {
            // The last moveNext must be completed before disposing.
            if (moveNext is not null && !moveNext.IsCompleted)
                await Task.WhenAny(moveNext).ConfigureAwait(false);
            await enumerator.DisposeAsync().ConfigureAwait(false);
            timer?.Dispose();
        }
    }
}

The timer is restarted each time a chunk is emitted, after the consumer has finished consuming the chunk.

Online demo.

This implementation is destructive, meaning that in case the source sequence fails or the enumeration is canceled, any elements that have been consumed previously from the source and are buffered, will be lost. See this question for ideas about how to inject a non-destructive behavior.

Care has been taken to avoid leaking fire-and-forget MoveNextAsync operations or timers.

For an implementation that uses the Task.Delay method instead of the PeriodicTimer class, and so it can be used by .NET versions previous than 6.0, you can look at the 7th revision of this answer. That revision includes also a tempting but flawed Rx-based implementation.

Mcclean answered 24/5, 2021 at 17:26 Comment(4)
This leaks timers, a limited resource. Unlike the supposed leaks you mentioned, this is an actual leak. The time behavior is untestable, which makes using this code very hard to use in any except the simplest cases. Event stream processing is complex, so the ability to test is paramount. As for the supposed problems with Rx or AsyncRx - events come whether you want them or not, AsyncRx is the library you should check, not Rx.Barathea
And then there are the leaked Tasks created by AsTask(). That's a real leak, not something caused by misuse. Event and async streams are typically long-running so those tasks add up and put pressure to the GC. That's why the authors of System.Linq.Async, AsyncRx.NET, Rx.NET and even the BCL itself go to great lengths to mitigate thisBarathea
@Panagiotis as for the tasks created by AsTask(), why do you think that they are leaked? Every single one of them is awaited. The only case that one task can be leaked is if the consumer of the resulting sequence abandons the enumeration without awaiting the last MoveNextAsync operation. Regarding the suitability of the AsyncRX.NET for solving this problem, I've already prompted you to post the solution as an answer. I don't know how to do it myself, since the library is not released, so please do.Mcclean
You haven't asked a question. You're using SO to post an article. At best, you're asking for a review of the "answer". I didn't say tasks are leaked, they're created and need to be GCd though. .NET Core has gone to great lengths to avoid this. As for timers - there's a reason David Fowler warns against using Task.Delay without cancellation. Again, async streams are long-lived and those orphaned items add upBarathea

© 2022 - 2024 — McMap. All rights reserved.