The Rx library includes operators that accept lambda parameters, and some of these lambdas are provided with a CancellationToken
that is controlled by the library itself. Some examples of these operators are the FromAsync
, StartAsync
and Create
:
// Converts an asynchronous action into an observable sequence. Each subscription
// to the resulting sequence causes the action to be started. The CancellationToken
// passed to the asynchronous action is tied to the observable sequence's subscription
// that triggered the action's invocation and can be used for best-effort cancellation.
public static IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync);
I was under the impression that the Rx library does a good job at managing the lifecycle of the CancellationTokenSource
s that obviously has to create behind the scenes, but I am not so sure any more. Let's first state that the documentation insists strongly that the CancellationTokenSource
s should be disposed of:
This type implements the
IDisposable
interface. When you have finished using an instance of the type, you should dispose of it either directly or indirectly. To dispose of the type directly, call itsDispose
method in atry
/catch
block. To dispose of it indirectly, use a language construct such asusing
(in C#) orUsing
(in Visual Basic).
Also from here:
Always call
Dispose
before you release your last reference to theCancellationTokenSource
. Otherwise, the resources it is using will not be freed until the garbage collector calls theCancellationTokenSource
object'sFinalize
method.
I made the experiment below to test my assumptions. It uses reflection to read the private fields _source
and _disposed
of the types CancellationToken
and CancellationTokenSource
respectively (.NET 5).
CancellationToken capturedToken = default;
var subscription = Observable.FromAsync(async token =>
{
capturedToken = token;
token.Register(() => Console.WriteLine("Token canceled"));
await Task.Delay(Timeout.Infinite, token);
})
.TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(500)))
.Finally(() => Console.WriteLine("The observable was terminated"))
.Subscribe();
Thread.Sleep(1000);
var cts = (CancellationTokenSource)(typeof(CancellationToken)
.GetField("_source", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(capturedToken));
bool disposed = (bool)(typeof(CancellationTokenSource)
.GetField("_disposed", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(cts));
Console.WriteLine($"IsCancellationRequested: {cts.IsCancellationRequested}");
Console.WriteLine($"IsDisposed: {disposed}");
Output:
Token canceled
The observable was terminated
IsCancellationRequested: True
IsDisposed: False
Try it on Fiddle (.NET Framework version, having differently named private fields)
The captured CancellationToken
is inspected half a second after the asynchronous operation has been canceled and the observable has terminated. The _disposed
field has the value false
, indicating that the Dispose
method of the associated CancellationTokenSource
has not been invoked. Am I doing something wrong, or the Rx library indeed omits disposing of the CancellationTokenSource
s it creates?
.NET 5.0.1, System.Reactive 5.0.0, C# 9
IDisposable
. In practice,CancellationTokenSource
only needs disposing if you've used.CancelAfter(...)
orCancellationToken.WaitHandle
, and even then, the timer will clean itself up when it fires, and most people don't bother to dispose mutexes (as doing so often involves introducing a race). It's a bit likeTask.Dispose
: it's there in the API, but not something that's called most of the time in practice – FluterTask.Dispose
documentation states explicitly that: "However, particularly if your app targets the .NET Framework 4.5 or later, there is no need to call Dispose unless performance or scalability testing indicates that, based on your usage patterns, your app's performance would be improved by disposing of tasks.". There is no such subtleties included in the docs for theCancellationTokenSource.Dispose
method! – ElevatedTask.Dispose
. There are plenty of types where disposing, although part of the API, isn't generally necessary in practice, regardless of what the docs say. CTS, like mutexes, is just generally quite annoying to dispose correctly, and since disposing isn't actually required except in very rare cases, the general convention is not to bother. – FluterCancellationTokenSource.Dispose
method is wrong, and the docs should be updated with less strongly-worded language like "Always". Otherwise, are we supposed to stop trusting the documentation in general? – ElevatedCancellationTokenSource
is a type that becomes more and more relevant with each new API that is released, because of how important cooperative cancellation is (Thread.Abort
is not even supported any more). – ElevatedCancellationTokenSource
s. Otherwise our code may become broken in the future, if Microsoft decides to change something in the implementation. – ElevatedMemoryStream
is a classic doesn't-need-to-be-disposed. I already mentioned mutexes, not because they don't need to be, but because they're really hard to dispose – FluterMutex
es (or otherWaitHandle
s) under the hood, and omitted disposing of them? Would you say that it's OK because it is difficult, and library authors are not expected to deal correctly with difficult things? – ElevatedMemoryStream
state that: "This type implements the IDisposable interface, but does not actually have any resources to dispose. This means that disposing it by directly calling Dispose() or by using a language construct such as using (in C#) or Using (in Visual Basic) is not necessary." So it's not a good counterexample IMHO. – ElevatedCancellationTokenSource.Dispose
more than what people say in the internet... – Elevated