I have a producer-consumer scenario where the producer is an enumerable sequence of items (IEnumerable<Item>
). I want to process these items in chunks/batches of 10 items each. So I decided to use the new (.NET 6) Chunk
LINQ operator, as suggested in this question: Create batches in LINQ.
My problem is that sometimes the producer fails, and in this case the consumer of the chunkified sequence receives the error without first receiving a chunk with the last items that were produced before the error. So if for example the producer generates 15 items and then fails, the consumer will get a chunk with the items 1-10 and then will get an exception. The items 11-15 will be lost! Here is a minimal example that demonstrates this undesirable behavior:
static IEnumerable<int> Produce()
{
int i = 0;
while (true)
{
i++;
Console.WriteLine($"Producing #{i}");
yield return i;
if (i == 15) throw new Exception("Oops!");
}
}
// Consume
foreach (int[] chunk in Produce().Chunk(10))
{
Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}
Output:
Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Unhandled exception. System.Exception: Oops!
at Program.<Main>g__Produce|0_0()+MoveNext()
at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
at Program.Main()
The desirable behavior would be to get a chunk with the values [11, 12, 13, 14, 15]
before getting the exception.
My question is: Is there any way to configure the Chunk
operator so that it prioritizes emitting data instead of exceptions? If not, how can I implement a custom LINQ operator, named for example ChunkNonDestructive
, with the desirable behavior?
public static IEnumerable<TSource[]> ChunkNonDestructive<TSource>(
this IEnumerable<TSource> source, int size);
Note: Except from the System.Linq.Chunk
operator I also experimented with the Buffer
operator from the System.Interactive package, as well as the Batch
operator from the MoreLinq package. Apparently they all behave the same (destructively).
Update: Here is the desirable output of the above example:
Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Consumed: [11, 12, 13, 14, 15]
Unhandled exception. System.Exception: Oops!
at Program.<Main>g__Produce|0_0()+MoveNext()
at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
at Program.Main()
The difference is the line Consumed: [11, 12, 13, 14, 15]
, that is not present in the actual output.
ChunkNonDestructive
operator myself from an existing implementation, I would probably use Microsoft's implementation as starting point. No offense, your implementation might be way better, but no one of us can beat Microsoft single-handedly on reputation points! 😃 – Hydrophone