How to aggregate results of an IAsyncEnumerable
Asked Answered
P

3

3

I want to ask if there is a plan or exists a method to aggregate the return(s) of an IAsyncEnumerable? So given the below method why is there no terse way to aggregate its result?

public async IAsyncEnumerable<bool> GenerateAsyncEnumerable(int range)
{
     foreach (var x in Enumerable.Range(0, range))
     {
           await Task.Delay(500 * x);
           yield return x % 2 == 0;
     }
}

Current Scenario

public async Task Main()
{

    bool accum = true;
    await foreach (var item in GenerateAsyncEnumerable(3))
    {
         accum &= item;
         //have some side effects on each particular item of the sequence
    }
    Console.WriteLine($"Accumulator:{accum}");
}

Desired Scenario

I would like to aggregate the results of the IAsyncEnumerable given a custom aggregation Func

public async Main()
{

    bool result = await foreach (var item in GenerateAsyncEnumerable(3).Aggregate(true, (x, y) => x & y))
    {
        //have some side effects on each particular item of the sequence
    }
}

P.S I do not like (in the first scenario) that I have to add an extra local variable accum that collects the results of the enumerable. Have I missed something, is there some syntactic sugar that I am not aware of ?

Pipsqueak answered 25/2, 2020 at 10:37 Comment(3)
You could write your own extension method that returns a new IAsyncEnumerable?Songer
All those methods are part of System.Linq.Async, including AggregateAsync.Alina
As @PanagiotisKanavos mentioned there are extensions methods under in library System.Linq.Async. One of them is ToAsyncEnumerable but exercise caution when using it. See #59957123Girovard
S
4

You could use the AggregateAsync method, from the System.Linq.Async package:

bool result = await GenerateAsyncEnumerable(3).AggregateAsync(true, (x, y) => x & y);
Console.WriteLine($"Result: {result}");

Output:

Result: False

Supplant answered 25/2, 2020 at 12:58 Comment(0)
A
3

The System.Linq.Async package developed by the ReactiveX team provides LINQ operators for IAsyncEnumerable that are equivalent to those LINQ provides for IEnumerable.

This includes the common operators like Select(), Where(), Take() etc. Aggregate is implemented by AggregateAsync.

The overloads are similar to Enumerable.Aggregate , which means you can write:

bool result=await GenerateAsyncEnumerable(3).AggregateAsync(true, (x, y) => x & y);

AggregateAsync is named this way because it consumes the entire enumerable and produces a single result. It needs an await call to work. Other operators like Select though, accept an IAsyncEnumerable and produce a new one. There's no need to await them.

You can use this naming convention to find the Linq.Async operators you need based on the name of the equivalent Enumerable operator

Alina answered 25/2, 2020 at 13:7 Comment(0)
Y
0
public static class AsyncEnumerableExtensions
{
    public static async Task<TOut> AggregateAsync<T, TOut>(
        this IAsyncEnumerable<T> asyncCollection,
        TOut seed, 
        Func<TOut, T, TOut> func) 
    {
        await foreach (var item in asyncCollection) 
        {
            seed = func(seed, item);
        }
        return seed;
    }
}
Yoicks answered 21/3 at 10:8 Comment(1)
Nice, but it also needs ConfigureAwait(false) and WithCancellation to be perfect.Supplant

© 2022 - 2024 — McMap. All rights reserved.