How to call back async function from Rx Subscribe?
Asked Answered
F

4

33

I would like to call back an async function within an Rx subscription.

E.g. like that:

public class Consumer
{
    private readonly Service _service = new Service();

    public ReplaySubject<string> Results = new ReplaySubject<string>();

    public void Trigger()
    {
        Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
    }

    public Task RunAsync()
    {
        return _service.DoAsync();
    }
}

public class Service
{
    public async Task<string> DoAsync()
    {
        return await Task.Run(() => Do());
    }

    private static string Do()
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(200));
        throw new ArgumentException("invalid!");
        return "foobar";
    }
}

[Test]
public async Task Test()
{
    var sut = new Consumer();
    sut.Trigger();
    var result = await sut.Results.FirstAsync();
}

What needs to be done, in order to catch the exception properly?

Farrica answered 11/4, 2014 at 8:1 Comment(2)
I just figured that I can put async at the first place. But unfortunately this doesn't solve the problem I am faced with. I will post a more explicit example.Farrica
possible duplicate of Is there a way to subscribe an observer as asyncChaetopod
T
19

You don't want to pass an async method to Subscribe, because that will create an async void method. Do your best to avoid async void.

In your case, I think what you want is to call the async method for each element of the sequence and then cache all the results. In that case, use SelectMany to call the async method for each element, and Replay to cache (plus a Connect to get the ball rolling):

public class Consumer
{
    private readonly Service _service = new Service();

    public IObservable<string> Trigger()
    {
        var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
            .SelectMany(_ => RunAsync())
            .Replay();
        connectible.Connect();
        return connectible;
    }

    public Task<string> RunAsync()
    {
        return _service.DoAsync();
    }
}

I changed the Results property to be returned from the Trigger method instead, which I think makes more sense, so the test now looks like:

[Test]
public async Task Test()
{
    var sut = new Consumer();
    var results = sut.Trigger();
    var result = await results.FirstAsync();
}
Tegucigalpa answered 11/4, 2014 at 11:15 Comment(5)
That is an awesome solution. Thanks Stephen.Farrica
What if you want to catch exceptions thrown from RunAsync? What I got is: obs.SelectMany(x => Observable.FromAsync(() => RunAsync()).Catch(Observable.Empty<string>())) Is there a better way?Ropy
@VictorGrigoriu: I recommend you ask your own question and tag it with system.reactive.Tegucigalpa
This solution does not preserve the ordering of results (just in case a reader is trying to achieve that). To preserve ordering look at the answers below.Thunder
And here's @Victor Grigoriu's follow-up question (with an answer): https://mcmap.net/q/452449/-rx-subscribe-with-async-function-and-ignore-errors/68955Ribal
D
47

Ana Betts' answer works in most scenarios, but if you want to block the stream while waiting for the async function to finish you need something like this:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(l => Observable.FromAsync(asyncMethod))
          .Concat()
          .Subscribe();

Or:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))
          .Concat()
          .Subscribe();
Diadromous answered 4/5, 2015 at 12:45 Comment(3)
It's another use case but very interesting though.Farrica
You must be very careful with this approach. If your async handler throws an exception, the subscription will be terminated, and your handle will stop receiving events. Basically, if you need asynchronous processing of observable events, you have to queue them and consume by a separate background task. There is no way around it - I learned that hard way.Dissever
Relevant discussion: github.com/dotnet/reactive/issues/459Twit
H
22

Change this to:

Observable.Timer(TimeSpan.FromMilliseconds(100))
    .SelectMany(async _ => await RunAsync())
    .Subscribe();

Subscribe doesn't keep the async operation inside the Observable.

Hamal answered 11/4, 2014 at 16:45 Comment(3)
Thanks Paul, for your suggestion. Very interesting. Do you have something where I can read a little bit further about that?Farrica
Note for others using this solution, if you need keep the order just replace SelectMany with Select and then .Concat (similar to reijerh's answer).Ancheta
This will produce an observable sequence of unfinished tasks. What do you do with them next? Your "subscriber" does nothing. All the tasks hang unobserved, no control over exceptions/termination. This is not a solution.Dissever
T
19

You don't want to pass an async method to Subscribe, because that will create an async void method. Do your best to avoid async void.

In your case, I think what you want is to call the async method for each element of the sequence and then cache all the results. In that case, use SelectMany to call the async method for each element, and Replay to cache (plus a Connect to get the ball rolling):

public class Consumer
{
    private readonly Service _service = new Service();

    public IObservable<string> Trigger()
    {
        var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
            .SelectMany(_ => RunAsync())
            .Replay();
        connectible.Connect();
        return connectible;
    }

    public Task<string> RunAsync()
    {
        return _service.DoAsync();
    }
}

I changed the Results property to be returned from the Trigger method instead, which I think makes more sense, so the test now looks like:

[Test]
public async Task Test()
{
    var sut = new Consumer();
    var results = sut.Trigger();
    var result = await results.FirstAsync();
}
Tegucigalpa answered 11/4, 2014 at 11:15 Comment(5)
That is an awesome solution. Thanks Stephen.Farrica
What if you want to catch exceptions thrown from RunAsync? What I got is: obs.SelectMany(x => Observable.FromAsync(() => RunAsync()).Catch(Observable.Empty<string>())) Is there a better way?Ropy
@VictorGrigoriu: I recommend you ask your own question and tag it with system.reactive.Tegucigalpa
This solution does not preserve the ordering of results (just in case a reader is trying to achieve that). To preserve ordering look at the answers below.Thunder
And here's @Victor Grigoriu's follow-up question (with an answer): https://mcmap.net/q/452449/-rx-subscribe-with-async-function-and-ignore-errors/68955Ribal
T
3

Building on reijerh's answer, I created an extension method.

public static IDisposable SubscribeAsync<TResult>(this IObservable<TResult> source, Func<Task> action) =>
            source.Select(_ => Observable.FromAsync(action))
                .Concat()
                .Subscribe();

If I understand this correctly, it should block until the async task finishes. But it will allow you to call SubscribeAsync and pass in your task. In my opinion this makes it a little more readable.

WhenSomethingHappened
    .SubscribeAsync(async () => { 
        await DoFirstAsyncThing(); 
        await DoSecondAsyncThing(); 
    });

Tuddor answered 12/4, 2022 at 17:0 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.