How to implement a custom SelectMany operator that waits for all observable subsequences to complete?
Asked Answered
L

1

0

I am using the SelectMany operator in order to project the elements of an observable sequence to tasks, and propagate the results of those tasks. Everything is OK if all operations succeed, but I don't like that in case of an exception all currently running operations are becoming fire-and-forget. I would prefer to wait until all pending operations have completed, and only then be notified for the error (or errors) that have occurred. Here is a minimal example of the behavior that I would like to avoid:

try
{
    await Observable
        .Interval(TimeSpan.FromMilliseconds(100))
        .Select(x => (int)x + 1)
        .Take(5)
        .SelectMany(x => Observable.FromAsync(async ct =>
        {
            await Task.Delay(500); // Simulate an I/O operation
            if (x == 3) throw new ApplicationException("Oops!");
            Console.WriteLine($"Operation #{x} completed");
            return x;
        }))
        .Do(x => Console.WriteLine($"Result: {x}"));
}
catch (Exception ex)
{
    Console.WriteLine($"Failed, {ex.GetType().Name}: {ex.Message}");
}
await Task.Delay(1000);

Output (undesirable):

Operation #1 completed
Result: 1
Operation #2 completed
Result: 2
Failed, ApplicationException: Oops!
Operation #4 completed
Operation #5 completed

Try it on Fiddle:

The desirable output should look like this:

Operation #1 completed
Result: 1
Operation #2 completed
Result: 2
Operation #4 completed
Result: 4
Operation #5 completed
Result: 5
Failed, AggregateException: One or more errors occurred. (Oops!)

The sequence should propagate the exception of the item #3 only after all other running operations (#4 and #5) have completed.

In the above example I have intentionally omitted to use the CancellationToken argument of the Observable.FromAsync method, because I want to simulate a situation where the launched asynchronous operations are not cancelable, or their cancellation is not instantaneous.

I am thinking about implementing a custom operator SelectManyUntilCompletion with the signature below:

public static IObservable<TResult> SelectManyUntilCompletion<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, CancellationToken, IObservable<TResult>> selector);

The signature is almost identical with the SelectMany. The only difference is that the selector delegate has a CancellationToken parameter. This token should be canceled when an error has occurred in any other subsequence. The idea is that instead of unsubscribing abruptly from the subsequences, the operator should instead communicate the cancellation, but remain subscribed to the subsequences until their natural completion.

My question is: how can I implement this operator? Is it possible to implement it based on the existing SelectMany, or should I go at a lower level, using the Observable.Create?

Below is the detailed description of the operator. In case of success its behavior should be identical with the SelectMany, so I am only describing its behavior in case of errors.

  1. The resulting sequence should complete when all subsequences have completed.
  2. The source sequence should be unsubscribed immediately after an error has occurred, so than no more subsequences are created.
  3. The CancellationToken should be signaled immediately after an error has occurred.
  4. The resulting sequence should propagate all the TResult values that have been produced by all subsequences before and after the error.
  5. The resulting sequence should finally propagate all the errors that have occurred, bundled in an AggregateException. This includes a possible error of the source sequence.
  6. The AggregateException should not include any OperationCanceledExceptions that may have occurred as a result of canceling the CancellationToken.

Marble diagram:

Source:        +-----A-----B-------------C---------------D----|
Subsequence-A:       +-------------a---------|
Subsequence-B:             +---b---------------------X
Subsequence-C:                           +-------c----------------c----|
Subsequence-D:
Result:        +---------------b---a-------------c----------------c----X

The subsequence D was not subscribed because it was emitted after the failure of the subsequence B.

The marble diagram indicates that the subsequence C did not respond promptly to the cancellation signal, which is a valid scenario.

Lodge answered 28/1, 2022 at 23:10 Comment(8)
You're not checking if ct.IsCancellationRequested. I've got an answer but don't have time to post. I'll come back tomorrow.Beyond
@Beyond yes, I mentioned in the question that not checking the token is intentional. Not all tasks honor immediately a cancellation request. I don't want to rely on the assumption/hope that the tasks will be canceled shortly. I want to know as a fact that all tasks have definitely completed when the sequence completes.Lodge
You have a race condition. All of the Observable.FromAsync have started before the cancellation token requests cancellation. You have to check.Beyond
@Beyond could you elaborate on that? My intention is to avoid fire-and-forget tasks, and the assumption is that I am not in control of the implementation of the tasks. The tasks may honor a cancellation request immediately, or with a delay, or not at all, and I can't change that.Lodge
I thought I had an answer. So, with the SelectMany/FromAsync pair you are creating fire-and-forget tasks as far as the outer observable is concerned if any of the inner ones fail. I thought I had an option using EventLoopScheduler, but it didn't work. I also tried Materialize but the only thing I got to succeed there was that every value was processed right to the end - getting it to stop and clean up gracefully in the middle has escaped me.Beyond
@Beyond thanks for trying to solve this. Yea, the Materialize sounds promising, although generally I dislike this operator because I consider it uneconomical. Wrapping each emitted value into a Notification object puts pressure on the garbage collector, which makes me unhappy. But I would accept it as a solution if it worked as expected.Lodge
Unfortunately Materialize just let the entire sequence run to completion which meant it created all tasks even beyond the point that the first error was raised. I think there's hope, but it's going to be some sort of side-effect or a double task with an unwrap.Beyond
@Beyond I think that I have a working implementation, based on the SelectMany+TakeUntil+Catch+Concat+Defer operators. I may post it tomorrow, after I have double-checked it for bugs.Lodge
L
1

Here is one solution to this problem. The implementation below is based on the SelectMany operator. All the involved observable sequences have their errors suppressed with a Catch+Empty combo. The errors are aggregated in a ConcurrentQueue<Exception>, and are thrown from a final Concat+Defer combo.

/// <summary>
/// Projects each element of the source observable sequence to a subsequence,
/// and merges the resulting subsequences into one observable sequence.
/// The merged sequence completes when all the projected subsequences complete
/// on their own. Unlike the SelectMany operator, the subsequences are not
/// unsubscribed when an error occurs.
/// </summary>
public static IObservable<TResult> SelectManyUntilCompletion<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, CancellationToken, IObservable<TResult>> selector)
{
    return Observable.Defer(() =>
    {
        var cts = new CancellationTokenSource();
        var errors = new ConcurrentQueue<Exception>();
        var stopSignal = new Subject<Unit>();
        var stopSignalSynchronized = Observer.Synchronize(stopSignal);
            
        IObservable<T> HandleErrorReturnEmpty<T>(Exception ex)
        {
            cts.Cancel();
            bool ignoreError = ex is OperationCanceledException
                && cts.IsCancellationRequested;
            if (!ignoreError) errors.Enqueue(ex);
            stopSignalSynchronized.OnNext(default);
            return Observable.Empty<T>();
        }

        return source
            .TakeUntil(stopSignal)
            .Catch((Exception ex) => HandleErrorReturnEmpty<TSource>(ex))
            .SelectMany(item =>
            {
                if (!errors.IsEmpty) return Observable.Empty<TResult>();
                IObservable<TResult> projected;
                try { projected = selector(item, cts.Token); }
                catch (Exception ex) { return HandleErrorReturnEmpty<TResult>(ex); }
                return projected
                    .Catch((Exception ex) => HandleErrorReturnEmpty<TResult>(ex));
            })
            .Concat(Observable.Defer(() =>
            {
                cts.Dispose();
                if (!errors.IsEmpty) throw new AggregateException(errors);
                return Observable.Empty<TResult>();
            }));
    });
}

In case of an error, a stopping signal is propagated through a synchronized Subject<Unit>, and observed by a TakeUntil operator which is chained to the source.

Usage example:

//...
.SelectManyUntilCompletion((item, token) => Observable.FromAsync(async () =>
{
    //...
}))
//...
Lodge answered 31/1, 2022 at 13:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.