How can I create a class that is both a Task<T> and an IObservable<T>?
Asked Answered
S

1

-1

Recently I encountered a situation where having an asynchronous operation represented both as a Task<T> and as an IObservable<T> would be advantageous. The task representation maintains the state of the operation (IsCompleted, IsFaulted etc), while the observable representation enables the composition of multiple operations in interesting ways (Concat, Merge, Switch etc), handling automatically the cancellation of any operation that has been unsubscribed along the way, solving this way the problem of fire-and-forgotten asynchronous operations. So I've become interested about ways to combine these two representations.

The easy, and probably correct, way to combine them would be through composition: creating a type that stores internally a Task<T> and an IObservable<T>, and exposes them as two of its properties. But in this question I am interested about the challenging, and probably impractical, possibility of a type that is a Task<T> and is an IObservable<T> at the same time. A type that can be passed directly to APIs that accept either tasks or observables, and do the right thing in either case. So it can't be just a task-like object. It must inherit from the real thing, the Task<T> class itself. Something like this:

public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
    public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
    {
        //...
    }
}

Creating an AsyncOperation instance should invoke immediately the supplied action. In other words an AsyncOperation should represent a hot task/observable combo.

Is it possible to create such a type?

Btw here is a thread in the ReactiveX/RxJava library that proves that others have thought about this problem before: No "isCompleted" or "isErrored" methods on Observable

Spoonbill answered 14/11, 2020 at 18:20 Comment(7)
Please show your attempts you have tried and the problems/error messages you get from your attempts. Extending from Task<T> doesn't look problematic (it's not sealed), but maybe you can write what the problem is (preferable with a MCVE).Scepter
@Scepter the problem is that all public Task<T> constructors create delegate tasks, while an asynchronous method creates a promise-style task. The constructors associated with promise-style tasks are private or internal.Spoonbill
Let me (quote][learn.microsoft.com/en-us/previous-versions/dotnet/… RX documentation.You do not need to implement the IObservable<T>/IObserver<T> interfaces yourself. Rx provides internal implementations of these interfaces for you and exposes them through various extension methods provided by the Observable and Observer types.Sadonia
@FilipCordas sure. My problem is that the built-in implementations of the IObservable<T> interface, for example the return value of the Observable.StartAsync method, do not inherit from the Task<T> class. So I can't use the functionality of a task for an observable that represents a single asynchronous operation. And this functionality is needed in some scenarios. For example the execution flow may depend on whether the operation has completed successfully or not.Spoonbill
@TheodorZoulias Again I don't see this being needed in any situation. I have never heard of an actual need to inherit from a Task<T> either, I was surprised it wasn't sealed. You can create a task from an observable and you can create an observable from a task.so the seams to be a nonexistent problem.Sadonia
@FilipCordas yeap, you can certainly create an observable and a task that both represent the same asynchronous operation, and then have to carry both of them around in order to use whatever representation you need each time. This is what I did in my answer to this question, and I'm not particularly happy with the looks of it.Spoonbill
@FilipCordas FYI the Observable.StartAsync operator calls internally the Task.ToObservable extension method, that returns a wrapped instance of an internal SlowTaskObservable class. This class stores the underlying task created when the Observable.StartAsync was invoked, but there is no way to get my hands on this task (except by using reflection tricks). My problem would be nonexistent if this task was accessible!Spoonbill
S
0

I found a way to create an observable that inherits from Task, by using a genius technique described by @GlennSlayden in this answer.

public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
    private readonly IObservable<TResult> _observable;
    private readonly Task<TResult> _promise;

    private AsyncOperation(Func<TResult> function) : base(() => function())
        => function = this.GetResult;

    private TResult GetResult() => _promise.GetAwaiter().GetResult();

    public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
        : this((Func<TResult>)null)
    {
        _observable = Observable.StartAsync(action, Scheduler.Immediate);
        _promise = _observable.ToTask();
        _promise.ContinueWith(_ => base.RunSynchronously(), default,
            TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
    }

    IDisposable IObservable<TResult>.Subscribe(IObserver<TResult> observer)
        => _observable.Subscribe(observer);
}

The above solution is not perfect because an instance of the derived class can never transition to the Canceled state. This is a problem I don't know how to fix, and it may not be fixable, but it's probably not very important. A cancellation emerges as a TaskCanceledException, and handling this exception is the normal way of dealing with canceled tasks anyway.

Interestingly the asynchronous operation can be canceled by creating a dummy subscription and disposing it:

var operation = new AsyncOperation<TResult>(async cancellationToken => { /* ... */ });

operation.Subscribe(_ => { }, _ => { }).Dispose(); // Cancels the cancellationToken

I experimented with this class a bit and I found that it's less practical than I initially thought it would be. The problem is that many APIs exist that support both tasks and observables, and are identical otherwise (for example Concat, Merge, Switch, Wait etc). This leads to the frequent appearance of compile-time errors (CS0121 ambiguous call). Resolving the ambiguities is possible by casting the type as either task or observable, but this is awkward, and negates the whole purpose of combining the two types in the first place.


Clarification: The line _promise.GetAwaiter().GetResult() may indicate at first glance that this implementation blocks a ThreadPool thread. This is not the case because the base Task is initially cold, and it's only warmed when the _promise has completed.

Spoonbill answered 16/11, 2020 at 12:3 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.