How can I "adapt" a Task<IEnumerable<T>> to IAsyncEnumerable<T>?
Asked Answered
M

5

11

I'm incrementally introducing Ix.NET into a legacy project. I have a number of storage-level APIs that return Task<IEnumerable<T>>, but I want to adapt them to IAsyncEnumerable<T> for consumption in the rest of the system. It seems like there should be a helper method (ala .ToAsyncEnumerable() for IEnumerable) to help with this, but I can't find anything... Do I have to implement my own custom Enumerator? (not hard, but I don't want to re-invent the wheel)

Membranophone answered 27/3, 2019 at 18:20 Comment(2)
Why are you preferring IAsyncEnumerable<T> over IObservable<T>?Complementary
Server-side, I want to stick with IAsyncEnumerable<T> for alignment with C# 8.0 async streams (which I want to eventually adopt for this scenario). On the client, I will be wrapping the response stream produced by this enumerable in an observable.Membranophone
N
8
Task<IEnumerable<T>> GetSomeResults<T>()
{
    throw new NotImplementedException();
}

async IAsyncEnumerable<T> GetAsyncEnumerable<T>()
{
    var results = await GetSomeResults<T>();
    foreach(var item in results)
    {
        yield return item;
    }
}
Norine answered 20/3, 2021 at 7:29 Comment(0)
N
8

As commented by Theodor Zoulias, System.Linq.Async is a NuGet package from .NET Foundation, which supports ToAsyncEnumerable().

Example usage:

    var tasks = new Task[0]; // get your IEnumerable<Task>s
    tasks.ToAsyncEnumerable();
Nowlin answered 5/9, 2022 at 16:28 Comment(0)
R
1

If you're talking about web APIs, Task<IEnumerable<T>> is an asynchronous way of producing a IEnumerable<T>.

Regardless of that IEnumerable<T> being produced synchronously or asynchronously, the whole list will be sent as an HTTP response.

The way you could leverage IAsyncEnumerable<T> on the client is if that client is invoking some kind of streaming or making multiple requests to a server for a unique list of results (paging).

Retinitis answered 31/3, 2019 at 21:5 Comment(6)
Yes, exactly! :-) For server-side, I want to enable asynchronous consumption of the Enumerable, and on the client, I want to move to using streaming responses.Membranophone
How do you expect to do that.Retinitis
On the server, I can naturally process the IAsyncEnumerable using the Linq extension methods in System.Interactive.Async, and for the client, I will construct an observable over a response that uses chunked transfer encoding (ala medium.com/@deaniusaur/…).Membranophone
I'm surely missing something here. An IAsyncEnumerable<T> is used for asynchronously pull data from a source. I don't see an immediate advantage of doing that on a WebAPI. What's pushed to the client is an IEnumerable<T>. Maybe if you could show some code...Retinitis
So, what I want to achieve on the client side is asynchronously streaming entities (of type T) over an open GET request as I fetch them (again, asynchronously) from the data source on the server. That said, this is all getting away from the original question. 😄 Putting the client piece aside, just looking at the server piece, I want to be able to create an IAsyncEnumerable<T> from a Task<IEnumerable<T>>, and I'm just looking for the most elegant way to do that (without writing my own Enumerator).Membranophone
That is, I have the following interface I need to implement: interface IDataSource<T> { IAsyncEnumerable<T> GetData(); } ...and the client I'm using to fetch the data has the following signature: class OldDatabaseClient<T> { Task<IEnumerable<T>> GetData() } What's the simplest (relatively efficient) way to implement the interface?Membranophone
E
1

I was looking for the exact same thing, and due to the replies here I assume that there is indeed no method like AsAsyncEnumerable(). So here's what I ended up doing, maybe it helps someone else:

public static class AsyncEnumerableExtensions {
    public struct AsyncEnumerable<T> : IAsyncEnumerable<T> {
        private readonly IEnumerable<T> enumerable;

        public AsyncEnumerable(IEnumerable<T> enumerable) {
            this.enumerable = enumerable;
        }

        public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken()) {
            return new AsyncEnumerator<T>(enumerable?.GetEnumerator());
        }
    }

    public struct AsyncEnumerator<T> : IAsyncEnumerator<T> {
        private readonly IEnumerator<T> enumerator;

        public AsyncEnumerator(IEnumerator<T> enumerator) {
            this.enumerator = enumerator;
        }

        public ValueTask DisposeAsync() {
            enumerator?.Dispose();
            return default;
        }

        public ValueTask<bool> MoveNextAsync() {
            return new ValueTask<bool>(enumerator == null ? false : enumerator.MoveNext());
        }

        public T Current => enumerator.Current;
    }

    public static AsyncEnumerable<T> AsAsyncEnumerable<T>(this IEnumerable<T> that) {
        return new AsyncEnumerable<T>(that);
    }

    public static AsyncEnumerator<T> AsAsyncEnumerator<T>(this IEnumerator<T> that) {
        return new AsyncEnumerator<T>(that);
    }
}
Evangelin answered 16/6, 2021 at 12:9 Comment(1)
Actually a method like the AsAsyncEnumerable already exists in the System.Linq.Async package (owned by the dotnetfoundation), and it's named ToAsyncEnumerable.Probationer
V
1
public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IEnumerable<T> enumerable)
{
    using IEnumerator<T> enumerator = enumerable.GetEnumerator();

    while (await Task.Run(enumerator.MoveNext).ConfigureAwait(false))
    {
        yield return enumerator.Current;
    }
}
Vikiviking answered 12/9, 2022 at 10:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.