How to plug a cancellation token into an existing IObservable
pipeline before calling Publish
on it (i.e., before it becomes an IConnectableObservable
)?
This must be a part of a cold observable pipeline, before subscribing to it (otherwise, I could pass a CancellationToken
token to IObservable
's Subscribe
, RunAsync
, ToTask
etc).
Is there a recommended pattern for this?
I can think of using TakeUntil
to achieve that, as suggested by Theodor Zoulias here. For example:
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
async Task Test(CancellationToken token)
{
var publishedSequence = Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Do(n => Console.WriteLine($"Emitting: {n}"))
.Skip(3)
.TakeUntil(
Observable.Create<long>(
observer => token.Register(
(_, token) => observer.OnError(new OperationCanceledException(token)),
null)))
.Finally(() => Console.WriteLine($"Finally"))
.Publish();
using var subscription = publishedSequence.Subscribe(
onNext: n => Console.WriteLine($"OnNext: {n}"),
onError: e => Console.WriteLine($"OnError: {e}"),
onCompleted: () => Console.WriteLine("OnCompleted"));
using var connection = publishedSequence.Connect();
await publishedSequence.ToTask();
}
var cts = new CancellationTokenSource(1000);
await Test(cts.Token);
Ouput:
Emitting: 0
Emitting: 1
Emitting: 2
Emitting: 3
OnNext: 3
Emitting: 4
OnNext: 4
Emitting: 5
OnNext: 5
Emitting: 6
OnNext: 6
Emitting: 7
OnNext: 7
Emitting: 8
OnNext: 8
OnError: System.OperationCanceledException: The operation was canceled.
Finally
I also have a prototype of a custom operator, WithCancellation
, which is basically a pass-through IObservable
also listening for a cancellation signal. I'd rather stick with a standard approach though.
Updated, I think I've found a race condition in how TakeUntil
works (maybe a bug or just a behavior I can't explain), fiddle. I can't repro it with my homegrown WithCancellation
implementation (commented out in the fiddle).
Updated, neither can I repro it if I use .TakeUntil(Task.Delay(Timeout.Infinite, token).ToObservable())
.