I'm attempting to use Reactive Extensions (Rx) to process a stream of data. The processing of each element may take some time, though. To break the processing, I'm using a CancellationToken
, which effectively stops the subscription.
When cancel has been requested, how do I gracefully finish the current work and terminate properly without losing any data?
Example
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250));
observable
.Subscribe(
value =>
{
Console.WriteLine(value);
Thread.Sleep(500); // Simulate processing
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Cancellation detected on {0}.", value);
Thread.Sleep(500); // Simulate some time consuming shutdown
Console.WriteLine("Cleaning up done for {0}.", value);
}
},
() => Console.WriteLine("Completed"),
cts.Token);
Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");
Output
0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for 2.
As can be seen from the output, the line "Job terminated" is not the last line, which means that the cleanup would not have had enough time to finish up before the application has terminated.
Expected Output
0
1
2
Token cancelled.
Cancellation detected on 2.
Cleaning up done for 2.
Job terminated.
The line "Job terminated" is the very last line to be printed. The "Cancellation" and "Cleaning" lines have been allowed to take their time.
(Edit: Added expected output)