Observable from chained Tasks
Asked Answered
S

5

6

I'm trying to create an Observable where each item is produced via an asynchronous task. The next item should be produced via an async call on the result of the previous item (co-recursion). In "Generate" parlance this would look something like this - except that Generate does not support async (nor does it support the delegate on the initial state.

var ob = Observable.Generate(
   async () => await ProduceFirst(),        // Task<T> ProduceFirst()
   prev => Continue(prev)                   // bool Continue(T);
   async prev => await ProduceNext(prev)    // Task<T> ProduceNext(T)
   item => item
);

As a more concrete example, to peek all messages from a ServiceBus queue by fetching them 100 messages at a time, implement ProduceFirst, Continue and ProduceNext as follows:

Task<IEnumerable<BrokeredMessage>> ProduceFirst() 
{
    const int batchSize = 100;
    return _serviceBusReceiver.PeekBatchAsync(batchSize);
}

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    const int batchSize = 100;
    return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}

And then call .SelectMany(i => i) on the IObservable<IEnumerable<BrokeredMessage>> to turn it into a IObservable<BrokeredMessage>

Where _serviceBusReceiver is an instance of an interface as follows:

public interface IServiceBusReceiver
{
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}

And BrokeredMessage is from https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx

Serranid answered 17/7, 2015 at 22:12 Comment(1)
Could you please make sure that your code is correct? The example Continue and ProduceNext methods do not compile. Also, you need to provide the signatures at least for your _serviceBusClient class and for BrokeredMessage.Photosynthesis
D
12

If you are going to roll your own async Generate function I would recommend the use of recursive scheduling instead of wrapping a while loop.

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null) 
{
  var s = scheduler ?? Scheduler.Default;

  return Observable.Create<TResult>(async obs => {
    return s.Schedule(await initialState(), async (state, self) => 
    {
      if (!condition(state))
      {
        obs.OnCompleted();
        return;
      }

      obs.OnNext(resultSelector(state));

      self(await iterate(state));

    });
  });
}

This has a couple of advantages. First, you are able to cancel this, with a simple while loop there is no way to cancel it directly, in fact you don't even return for the subscribe function until the observable has completed. Second, this lets you control the scheduling/asynchrony of each item (which makes testing a breeze), this also makes it a better overall fit for library

Depressed answered 18/7, 2015 at 0:59 Comment(8)
Thank you - I will try this out and mark as answer if it works (or at least leads me to a working implementation, which it looks like it will)Serranid
This is very clever Paul.Photosynthesis
Thanks Paul and @Enigmativity, they both work. I wish I could mark both as answers. I am going to use Paul's implementation for now as it has one important advantage (at least for myself at the present) - production waits for consumption.Serranid
I might be violating the spirit of the reactive paradigm but this is what I need at the moment - it should not continue pulling messages from a queue until they can be processed (at least in the near future).Serranid
I think that defaulting to the Scheduler.Immediate instead of the Scheduler.Default is the less surprising behavior. In case the caller supplies synchronous implementations of the async lambdas, and don't supply a scheduler, wouldn't it be expected for all lambdas to run on the current thread?Chockablock
Actually this solution is problematic because it passes an async lambda to the synchronous scheduler.Schedule method, resulting to an async void lambda. The asynchronous scheduler.ScheduleAsync should be used instead.Chockablock
@TheodorZoulias if you propose an edit I can take a look at it. It's been a number of years since I have worked with C# so I have not been keeping up with the intricacies of async scheduling.Depressed
Paul here is my suggested improvement. I am not an expert in Rx scheduling either, so this may have other flaws. If you don't trust it, I can post it as a separate answer, to be evaluated separately by future experts. :-)Chockablock
P
5

After doing a good bit of testing I think this does the job nicely using the built-in Rx operators.

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null) 
{
    return Observable.Create<TResult>(o =>
    {
        var current = default(TResult);
        return
            Observable
                .FromAsync(initialState)
                .Select(y => resultSelector(y))
                .Do(c => current = c)
                .Select(x =>
                    Observable
                        .While(
                            () => condition(current),
                            Observable
                                .FromAsync(() => iterate(current))
                                .Select(y => resultSelector(y))
                        .Do(c => current = c))
                        .StartWith(x))
                .Switch()
                .Where(x => condition(x))
                .ObserveOn(scheduler ?? Scheduler.Default)
                .Subscribe(o);
    });
}

I've tested this code with the following:

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
    return
        Task.FromResult(
            EnumerableEx.Return(
                new BrokeredMessage()
                {
                    SequenceNumber = 1
                }));
}

Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    return Task.FromResult(
        prev.Last().SequenceNumber < 3
            ? EnumerableEx.Return(
                new BrokeredMessage()
                {
                    SequenceNumber = prev.Last().SequenceNumber + 1 
                })
            : Enumerable.Empty<BrokeredMessage>());
}

public class BrokeredMessage
{
    public int SequenceNumber;
}

And running this sequence:

var ob = Generate(
    async () => await ProduceFirst(),
    prev => Continue(prev),
    async prev => await ProduceNext(prev),
    item => item);

I got this result:

result

My test code also used the Reactive Extension team's Interactive Extensions - NuGet "Ix-Main".

Photosynthesis answered 18/7, 2015 at 3:33 Comment(6)
Ah, knew there had to be a pure operator implementation of this!Depressed
I did have to mix .Create(...) & .Do(...) to be able to introduce isolated state, but I think it works just fine.Photosynthesis
@Depressed - I just tested yours and found mine had a flaw. Just had to add a where clause to filter out the empty last result. Now they both work the same. Yours is inspired though. Using .Schedule like that is awesome.Photosynthesis
What's the reason for defaulting to Scheduler.Default instead of the Scheduler.Immediate? I think that by not supplying a scheduler the caller wants no scheduling intervention, the equivalent of removing the ObserveOn operator from the query.Chockablock
In this version of the Generate operator, the condition is applied to the projected output (resultSelector(x)) instead of the current internal x. This is not happening with paulpdaniels's and csauve's versions. Is it intentional?Chockablock
@TheodorZoulias - Oversight, probably. Or semantic re-imagining. :-)Photosynthesis
A
1

Having a similar question myself and also agreeing with the following comment:

I might be violating the spirit of the reactive paradigm but this is what I need at the moment - it should not continue pulling messages from a queue until they can be processed (at least in the near future).

I believe that IAsyncEnumerable from Ix.NET is a better fit than IObservable for this scenario - both for the question here and any similar async unfolding function. The reason is that each time we iterate and then extract a result from a Task, the flow control is with us, the caller, to pull the next item or choose not to if a certain condition is met. This is like IAsyncEnumerable and not like IObservable, which pushes items to us without us having control over the rate.

Ix.NET doesn't have a suitable version of AsyncEnumerable.Generate so I wrote the following to solve this problem.

   public static IAsyncEnumerable<TState> Generate<TState>(TState initialState, Func<TState, bool> condition, Func<TState, Task<TState>> iterate)
    {
        return AsyncEnumerable.CreateEnumerable(() =>
        {
            var started = false;
            var current = default(TState);
            return AsyncEnumerable.CreateEnumerator(async c =>
            {

                if (!started)
                {
                    started = true;
                    var conditionMet = !c.IsCancellationRequested && condition(initialState);
                    if (conditionMet) current = initialState;
                    return conditionMet;
                }
                {
                    var newVal = await iterate(current).ConfigureAwait(false);
                    var conditionMet = !c.IsCancellationRequested && condition(newVal);
                    if (conditionMet) current = newVal;
                    return conditionMet;
                }

            },
                () => current,
                () => { });
        });



    }

Notes:

  • Only very lightly tested.
  • Does return the initial state.
  • Does not return the first TState that fails the condition, even though it has done the work to get that result. Possibly a different version could include that.
  • I would prefer to get rid of the condition parameter as, since it's a pull system, it's entirely up to the caller whether to call MoveNext or not and so condition seems redundant. It essentially adds a call to TakeWhile onto the result of the function. However I haven't looked deep enough into Ix.NET to know whether a false response from MoveNext is required in order to dispose the IAsyncEnumerator, so for that reason I've included it.

IAsyncEnumerable can of course be converted to IObservable if that specific type is required.

Asmodeus answered 25/10, 2018 at 3:37 Comment(0)
C
1

Here is another implementation, inspired by Enigmativity's answer. It uses newer language features (C# 7 tuple deconstruction).

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null)
{
    return Observable.Create<TResult>(observer =>
    {
        var (isFirst, current) = (true, default(TResult));
        return Observable
            .While(() => isFirst || condition(current),
                Observable.If(() => isFirst,
                    Observable.FromAsync(ct => initialState()),
                    Observable.FromAsync(ct => iterate(current))
                )
            )
            .Do(x => (isFirst, current) = (false, x))
            .Select(x => resultSelector(x))
            .ObserveOn(scheduler ?? Scheduler.Immediate)
            .Subscribe(observer);
    });
}
Chockablock answered 28/11, 2020 at 20:15 Comment(0)
S
0

I think this might be the correct answer:

This is not a good answer. Do not use.

I created by own Generate that supports async/await on the initial state + iterate functions:

    public static IObservable<TResult> Generate<TResult>(
        Func<Task<TResult>> initialState,
        Func<TResult, bool> condition,
        Func<TResult, Task<TResult>> iterate,
        Func<TResult, TResult> resultSelector
        )
    {
        return Observable.Create<TResult>(async obs =>
        {
            var state = await initialState();

            while (condition(state))
            {
                var result = resultSelector(state);
                obs.OnNext(result);
                state = await iterate(state);
            }

            obs.OnCompleted();

            return System.Reactive.Disposables.Disposable.Empty;
        });
    }

Unfortunately this seems to have the side-effect that the production of messages racing far ahead of consumption. If the observer processes messages slowly then this will fetch millions of messages before we even process a handful of them. Not exactly what we want from a service bus.

I'm going to work through the above, maybe read some more, and will post a more specific question if needed.

Serranid answered 17/7, 2015 at 23:19 Comment(3)
It's almost always the case that if you do a return Disposable.Empty; then you are doing something wrong. Typically you get a stream that cannot be stopped and this can lead to memory leaks and rogue threads. I think you need to reconsider this implementation of Generate.Photosynthesis
Thank you for that - definitely makes sense.Serranid
I think that your Generate method is fine. You just need to use the Observable.Create overload that accepts a CancellationToken, and sprinkle the loop with some token.ThrowIfCancellationRequested(); statements. This overload doesn't even require returning an IDisposable, because this role is played by the CancellationToken. You could even consider passing this token to the initialState and iterate methods, to make the loop terminate almost immediately after the unsubscription of the observer.Chockablock

© 2022 - 2024 — McMap. All rights reserved.