How to use Rx.Nex extension ForEachAsync with async action
Asked Answered
C

4

7

I have code which streams data down from SQL and writes it to a different store. The code is approximately this:

using (var cmd = new SqlCommand("select * from MyTable", connection))
{
     using (var reader = await cmd.ExecuteReaderAsync())
     {
         var list = new List<MyData>();
         while (await reader.ReadAsync())
         {
             var row = GetRow(reader);
             list.Add(row);
             if (list.Count == BatchSize)
             {
                 await WriteDataAsync(list);
                 list.Clear();
             }
         }
         if (list.Count > 0)
         {
             await WriteDataAsync(list);
         }
     }
 }

I would like to use Reactive extensions for this purpose instead. Ideally the code would look like this:

await StreamDataFromSql()
    .Buffer(BatchSize)
    .ForEachAsync(async batch => await WriteDataAsync(batch));

However, it seems that the extension method ForEachAsync only accepts synchronous actions. Would it be possible to write an extension which would accept an async action?

Contiguity answered 28/7, 2017 at 21:39 Comment(0)
H
5

Would it be possible to write an extension which would accept an async action?

Not directly.

Rx subscriptions are necessarily synchronous because Rx is a push-based system. When a data item arrives, it travels through your query until it hits the final subscription - which in this case is to execute an Action.

The await-able methods provided by Rx are awaiting the sequence itself - i.e., ForEachAsync is asynchronous in terms of the sequence (you are asynchronously waiting for the sequence to complete), but the subscription within ForEachAsync (the action taken for each element) must still be synchronous.

In order to do a sync-to-async transition in your data pipeline, you'll need to have a buffer. An Rx subscription can (synchronously) add to the buffer as a producer while an asynchronous consumer is retrieving items and processing them. So, you'd need a producer/consumer queue that supports both synchronous and asynchronous operations.

The various block types in TPL Dataflow can satisfy this need. Something like this should suffice:

var obs = StreamDataFromSql().Buffer(BatchSize);
var buffer = new ActionBlock<IList<T>>(batch => WriteDataAsync(batch));
using (var subscription = obs.Subscribe(buffer.AsObserver()))
  await buffer.Completion;

Note that there is no backpressure; as quickly as StreamDataFromSql can push data, it'll be buffered and stored in the incoming queue of the ActionBlock. Depending on the size and type of data, this can quickly use a lot of memory.

Hypogenous answered 29/7, 2017 at 1:29 Comment(3)
Can you explain "Rx subscriptions are necessarily synchronous because Rx is a push-based system" some more? At first glance I would say that that is not correct, but maybe I've misunderstood what you're saying.Priapic
@Priapic I mean that there's no built-in automatic backpressure system. The subscription in your answer, for example, is synchronous, not asynchronous. As soon as the first await is hit in your subscription, as far as Rx is concerned the whole subscription method is already done and it's free to start another one.Hypogenous
Fair enough. I think I should have put in an .ObserveOn call to move the execution away from the UI thread. And then, depending on if WriteDataAsync uses any UI elements, it could just run off the UI and avoid issues about async that way.Priapic
P
0

The correct thing to do is to use Reactive Extensions properly to get this done - so start from the point that you create the connection right up until you write your data.

Here's how:

IObservable<IList<MyData>> query =
    Observable
        .Using(() => new SqlConnection(""), connection =>
            Observable
                .Using(() => new SqlCommand("select * from MyTable", connection), cmd =>
                    Observable
                        .Using(() => cmd.ExecuteReader(), reader =>
                            Observable
                                .While(() => reader.Read(), Observable.Return(GetRow(reader))))))
        .Buffer(BatchSize);

IDisposable subscription =
    query
        .Subscribe(async list => await WriteDataAsync(list));

I couldn't test the code, but it should work. This code assumes that WriteDataAsync can take a IList<MyData> too. If it doesn't just drop in a .ToList().

Priapic answered 29/7, 2017 at 1:20 Comment(2)
Subscribe doesn't take a Func<T,Task>, so the async and await aren't really doing anything.Dunsinane
@Dunsinane - It's still an Action<T> and that works with async/await.Priapic
G
0

Here is a version of the ForEachAsync method that supports asynchronous actions. It projects the source observable to a nested IObservable<IObservable<Unit>> containing the asynchronous actions, and then flattens it back to an IObservable<Unit> using the Merge operator. The resulting observable is finally converted to a task.

By default the actions are invoked sequentially, but it is possible to invoke them concurrently by configuring the optional maximumConcurrency argument.

Canceling the optional cancellationToken argument results to the immediate completion (cancellation) of the returned Task, potentially before the cancellation of the currently running actions.

Any exception that may occur is propagated through the Task, and causes the cancellation of all currently running actions.

/// <summary>
/// Invokes an asynchronous action for each element in the observable sequence,
/// and returns a 'Task' that represents the completion of the sequence and
/// all the asynchronous actions.
/// </summary>
public static Task ForEachAsync<TSource>(
    this IObservable<TSource> source,
    Func<TSource, CancellationToken, Task> action,
    CancellationToken cancellationToken = default,
    int maximumConcurrency = 1)
{
    // Arguments validation omitted
    return source
        .Select(item => Observable.FromAsync(ct => action(item, ct)))
        .Merge(maximumConcurrency)
        .DefaultIfEmpty()
        .ToTask(cancellationToken);
}

Usage example:

await StreamDataFromSql()
    .Buffer(BatchSize)
    .ForEachAsync(async (batch, token) => await WriteDataAsync(batch, token));
Greegree answered 20/11, 2020 at 20:42 Comment(0)
D
-1

Here is the source code for ForEachAsync and an article on the ToEnumerable and AsObservable method

We can make a wrapper around the ForEachAsync that will await a Task-returning function:

public static async Task ForEachAsync<T>( this IObservable<T> t, Func<T, Task> onNext )
{
    foreach ( var x in t.ToEnumerable() )
        await onNext( x );
}

Example usage:

await ForEachAsync( Observable.Range(0, 10), async x => await Task.FromResult( x ) );
Didymous answered 28/7, 2017 at 23:58 Comment(1)
Very good point. However, by waiting I am essentially losing the benefit of using async in the WriteDataAsync implementation. I am wondering if there is a maintain the non-blocking nature of the original code.Contiguity

© 2022 - 2024 — McMap. All rights reserved.