Is it possible to use the Using operator in Rx.Net with a resource that implements IAsyncDisposable
rather than IDisposable
? If not, is there some sort of workaround that I could use?
Is it possible to use Rx "Using" operator with IAsyncDisposable?
Asked Answered
Here is a Using
method that works with IAsyncDisposable
objects:
/// <summary>
/// Constructs an observable sequence that depends on a resource object,
/// whose lifetime is tied to the resulting observable sequence's lifetime.
/// </summary>
public static IObservable<TResult> Using<TResult, TResource>(
Func<TResource> resourceFactory,
Func<TResource, IObservable<TResult>> observableFactory)
where TResource : IAsyncDisposable
{
return Observable.Defer(() =>
{
TResource resource = resourceFactory();
IObservable<TResult> observable;
try { observable = observableFactory(resource); }
catch (Exception ex) { observable = Observable.Throw<TResult>(ex); }
Lazy<Task> lazyDisposeTask = new(() => resource.DisposeAsync().AsTask());
IObservable<TResult> disposer = Observable
.FromAsync(() => lazyDisposeTask.Value)
.Select(_ => default(TResult))
.IgnoreElements();
return observable
.Catch((Exception ex) => disposer.Concat(Observable.Throw<TResult>(ex)))
.Concat(disposer)
.Finally(() => lazyDisposeTask.Value.GetAwaiter().GetResult());
});
}
This method has identical signature with the Rx Observable.Using
method (apart from the where
clause), and it can be used in the same way.
This implementation takes care of all completion cases:
- Successful completion: The
IAsyncDisposable
resource is disposed asynchronously by theConcat
operator. - Completion with error: The
IAsyncDisposable
resource is disposed asynchronously by theCatch
operator. - The sequence is unsubscribed before its completion: The
IAsyncDisposable
resource is disposed synchronously by theFinally
operator. Disposing asynchronously the resource is not possible in this case, for reasons explained here.
Variant with asynchronous factory methods:
public static IObservable<TResult> Using<TResult, TResource>(
Func<CancellationToken, Task<TResource>> resourceFactoryAsync,
Func<TResource, CancellationToken, Task<IObservable<TResult>>> observableFactoryAsync)
where TResource : IAsyncDisposable
{
return Observable.Create<TResult>(async (observer, cancellationToken) =>
{
TResource resource = await resourceFactoryAsync(cancellationToken);
IObservable<TResult> observable;
try { observable = await observableFactoryAsync(resource, cancellationToken); }
catch { await resource.DisposeAsync(); throw; }
Lazy<Task> lazyDisposeTask = new(() => resource.DisposeAsync().AsTask());
IObservable<TResult> disposer = Observable
.FromAsync(() => lazyDisposeTask.Value)
.Select(_ => default(TResult))
.IgnoreElements();
return observable
.Catch((Exception ex) => disposer.Concat(Observable.Throw<TResult>(ex)))
.Concat(disposer)
.Finally(() => lazyDisposeTask.Value.GetAwaiter().GetResult())
.Subscribe(observer);
});
}
@jackdry sure, I updated the answer. –
Ginelle
Thanks a lot. Out of interest, is there a good reason for using
Lazy<Task>
rather than Func<Task>
for "lazyDisposeTask"? –
Vardon @jackdry yes. The
Lazy<Task>
ensures that the DisposeAsync
will be called only once. Most real-world disposables tolerate being disposed multiple times, but better be safe than sorry. :-) –
Ginelle @jackdry FYI I fixed a bug in the implementation of both
Using
methods. –
Ginelle Thank you. Just one more question (if okay!). Can you use
lazyDisposeTask.Value.Wait()
instead oflazyDisposeTask.Value.GetAwaiter().GetResult()
? –
Vardon @jackdry - #17285017 –
Tims
@jackdry it should be noted that the built-in
Observable.Using
method can cause an unhandled exception in case the Dispose
invocation fails, and the same is true for the Using
methods above. A relevant GitHub issue can be found here. Currently I don't know of a perfect solution to this problem. –
Ginelle © 2022 - 2024 — McMap. All rights reserved.
Task<TResource>
? – Vardon