I am using the SelectMany
operator in order to project the elements of an observable sequence to tasks, and propagate the results of those tasks. Everything is OK if all operations succeed, but I don't like that in case of an exception all currently running operations are becoming fire-and-forget. I would prefer to wait until all pending operations have completed, and only then be notified for the error (or errors) that have occurred. Here is a minimal example of the behavior that I would like to avoid:
try
{
await Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => (int)x + 1)
.Take(5)
.SelectMany(x => Observable.FromAsync(async ct =>
{
await Task.Delay(500); // Simulate an I/O operation
if (x == 3) throw new ApplicationException("Oops!");
Console.WriteLine($"Operation #{x} completed");
return x;
}))
.Do(x => Console.WriteLine($"Result: {x}"));
}
catch (Exception ex)
{
Console.WriteLine($"Failed, {ex.GetType().Name}: {ex.Message}");
}
await Task.Delay(1000);
Output (undesirable):
Operation #1 completed
Result: 1
Operation #2 completed
Result: 2
Failed, ApplicationException: Oops!
Operation #4 completed
Operation #5 completed
The desirable output should look like this:
Operation #1 completed
Result: 1
Operation #2 completed
Result: 2
Operation #4 completed
Result: 4
Operation #5 completed
Result: 5
Failed, AggregateException: One or more errors occurred. (Oops!)
The sequence should propagate the exception of the item #3 only after all other running operations (#4 and #5) have completed.
In the above example I have intentionally omitted to use the CancellationToken
argument of the Observable.FromAsync
method, because I want to simulate a situation where the launched asynchronous operations are not cancelable, or their cancellation is not instantaneous.
I am thinking about implementing a custom operator SelectManyUntilCompletion
with the signature below:
public static IObservable<TResult> SelectManyUntilCompletion<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, CancellationToken, IObservable<TResult>> selector);
The signature is almost identical with the SelectMany
. The only difference is that the selector
delegate has a CancellationToken
parameter. This token should be canceled when an error has occurred in any other subsequence. The idea is that instead of unsubscribing abruptly from the subsequences, the operator should instead communicate the cancellation, but remain subscribed to the subsequences until their natural completion.
My question is: how can I implement this operator? Is it possible to implement it based on the existing SelectMany
, or should I go at a lower level, using the Observable.Create
?
Below is the detailed description of the operator. In case of success its behavior should be identical with the SelectMany
, so I am only describing its behavior in case of errors.
- The resulting sequence should complete when all subsequences have completed.
- The
source
sequence should be unsubscribed immediately after an error has occurred, so than no more subsequences are created. - The
CancellationToken
should be signaled immediately after an error has occurred. - The resulting sequence should propagate all the
TResult
values that have been produced by all subsequences before and after the error. - The resulting sequence should finally propagate all the errors that have occurred, bundled in an
AggregateException
. This includes a possible error of thesource
sequence. - The
AggregateException
should not include anyOperationCanceledException
s that may have occurred as a result of canceling theCancellationToken
.
Marble diagram:
Source: +-----A-----B-------------C---------------D----|
Subsequence-A: +-------------a---------|
Subsequence-B: +---b---------------------X
Subsequence-C: +-------c----------------c----|
Subsequence-D:
Result: +---------------b---a-------------c----------------c----X
The subsequence D was not subscribed because it was emitted after the failure of the subsequence B.
The marble diagram indicates that the subsequence C did not respond promptly to the cancellation signal, which is a valid scenario.
ct.IsCancellationRequested
. I've got an answer but don't have time to post. I'll come back tomorrow. – BeyondObservable.FromAsync
have started before the cancellation token requests cancellation. You have to check. – BeyondSelectMany
/FromAsync
pair you are creating fire-and-forget tasks as far as the outer observable is concerned if any of the inner ones fail. I thought I had an option usingEventLoopScheduler
, but it didn't work. I also triedMaterialize
but the only thing I got to succeed there was that every value was processed right to the end - getting it to stop and clean up gracefully in the middle has escaped me. – BeyondMaterialize
sounds promising, although generally I dislike this operator because I consider it uneconomical. Wrapping each emitted value into aNotification
object puts pressure on the garbage collector, which makes me unhappy. But I would accept it as a solution if it worked as expected. – LodgeMaterialize
just let the entire sequence run to completion which meant it created all tasks even beyond the point that the first error was raised. I think there's hope, but it's going to be some sort of side-effect or a double task with an unwrap. – BeyondSelectMany
+TakeUntil
+Catch
+Concat
+Defer
operators. I may post it tomorrow, after I have double-checked it for bugs. – Lodge