I am using a Channel<T>
in a producer-consumer scenario, and I have the requirement to consume the channel in batches of 10 items each, and without letting any consumed item to stay idle in a buffer for more than 5 seconds. This duration is the maximum latency allowed between reading an item from the channel, and processing a batch that contains this item. The maximum latency policy has precedence over the batch size policy, so a batch should be processed even with fewer than 10 items, in order to satisfy the max-latency requirement.
I was able to implement the first requirement, in the form of a ReadAllBatches
extension method for the ChannelReader<T>
class:
public static async IAsyncEnumerable<T[]> ReadAllBatches<T>(
this ChannelReader<T> channelReader, int batchSize)
{
List<T> buffer = new();
while (true)
{
T item;
try { item = await channelReader.ReadAsync(); }
catch (ChannelClosedException) { break; }
buffer.Add(item);
if (buffer.Count == batchSize)
{
yield return buffer.ToArray();
buffer.Clear();
}
}
if (buffer.Count > 0) yield return buffer.ToArray();
await channelReader.Completion; // Propagate possible failure
}
I am planning to use it like this:
await foreach (Item[] batch in myChannel.Reader.ReadAllBatches(10))
{
Console.WriteLine($"Processing batch of {batch.Length} items");
}
My question is: how can I enhance my ReadAllBatches<T>
implementation with an additional TimeSpan timeout
parameter, that enforces the aforementioned maximum latency policy, and without installing third-party packages to my project?
Important: The requested implementation should not be susceptible to the memory leak issue that has been reported here. So the loop that consumes the channel should not cause the steady increment of the memory used by the application, in case the producer that writes the items in the channel has become idle for a prolonged period of time.
Note: I am aware that a similar question exists regarding batching the IAsyncEnumerable<T>
interface, but I am not interested to that. I am interested for a method that targets directly the ChannelReader<T>
type, for performance reasons.