How is an observable subscription gracefully terminated?
Asked Answered
M

3

5

I'm attempting to use Reactive Extensions (Rx) to process a stream of data. The processing of each element may take some time, though. To break the processing, I'm using a CancellationToken, which effectively stops the subscription.

When cancel has been requested, how do I gracefully finish the current work and terminate properly without losing any data?

Example

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250));

observable
    .Subscribe(
        value =>
            {
                Console.WriteLine(value);
                Thread.Sleep(500); // Simulate processing
                
                if (cts.Token.IsCancellationRequested)
                {
                    Console.WriteLine("Cancellation detected on {0}.", value);
                    Thread.Sleep(500); // Simulate some time consuming shutdown
                    Console.WriteLine("Cleaning up done for {0}.", value);
                }
            },
        () => Console.WriteLine("Completed"),
        cts.Token);
        
Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");

Output

0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for 2.

As can be seen from the output, the line "Job terminated" is not the last line, which means that the cleanup would not have had enough time to finish up before the application has terminated.

Expected Output

0
1
2
Token cancelled.
Cancellation detected on 2.
Cleaning up done for 2.
Job terminated.

The line "Job terminated" is the very last line to be printed. The "Cancellation" and "Cleaning" lines have been allowed to take their time.

(Edit: Added expected output)

Manutius answered 19/7, 2017 at 14:37 Comment(3)
What would you want the output to be?Emulous
In this example, the application thread exits before the subscription does. I can't find any way to somehow wait for the subscription's OnNext-handler to complete its work. I don't want the application to exit before the handler has completed, so the "Job terminated" should be the last print.Manutius
Related: How to cancel an observable sequenceAbisha
A
2

Observables are (a)waitable. Subscriptions to observables are not awaitable. So if you want to wait your subscription code to complete, without resorting to artificial solutions like using ManualResetEvents, you should make your subscription code a side-effect of a derived observable, and (a)wait that observable. The example presented in your question has additional requirements, that complicate matters a bit, but not that much:

  1. You want to do other things between subscribing to the observable and awaiting it to complete (Console.ReadLine() etc).

  2. You want to terminate the observable when a CancellationToken is canceled.

Below is an example of how to address these requirements. It shows just one of the many available ways to solve this problem:

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250));

var withCancellation = observable
    .TakeUntil(Observable.Create<Unit>(observer =>
        cts.Token.Register(() => observer.OnNext(default))));

var withSideEffectsAndCancellation = withCancellation
    .Do(value =>
    {
        Console.WriteLine(value);
        Thread.Sleep(500);

        if (cts.Token.IsCancellationRequested)
        {
            Console.WriteLine("Cancellation detected on {0}.", value);
            Thread.Sleep(500);
            Console.WriteLine("Cleaning up done for {0}.", value);
        }
    }, () => Console.WriteLine("Completed"));

var hotWithSideEffectsAndCancellation = withSideEffectsAndCancellation
    .Publish()
    .AutoConnect(0);

Console.ReadLine();
cts.Cancel();

hotWithSideEffectsAndCancellation.DefaultIfEmpty().Wait();
// or await hotWithSideEffectsAndCancellation.DefaultIfEmpty();
Console.WriteLine("Job terminated.");

Explanation:

  1. The .TakeUntil...cts.Token.Register... is an idiomatic way to unsubscribe instantly from the Interval observable, when the cts.Token is canceled. It is copy-pasted from a relevant question. You could also use the simpler .TakeWhile(x => !cts.Token.IsCancellationRequested), provided that you are OK with a slightly less responsive cancellation.

  2. The Do operator is a natural way to perform the subscription side-effects, because it has the same parameters with the Subscribe method.

  3. The .Publish().AutoConnect(0); makes the sequence hot right away. The AutoConnect operator offers no way to disconnect from the underlying observable (as opposed to the RefCount operator), but in this particular case the disconnect functionality is not needed. The lifetime of the underlying observable is already controlled by the CancellationToken that we attached previously.

  4. The .DefaultIfEmpty() before the .Wait() is required in order to prevent an InvalidOperationException in the edge case that the sequence is canceled before producing any element. It is also required if you await asynchronously the sequence. These mechanisms to wait an observable (as well as others like the RunAsync and the ToTask operators) are returning the last value emitted by the observable, and they become frustrated when no such value exists.

Abisha answered 3/1, 2021 at 0:41 Comment(3)
Thanks for a very well explained answer! I appreciate you trying to stay within Reactive to solve the problem, but this solution still doesn't allow the Do-method to finish its work before terminating.Manutius
@Manutius yeap, you are right. I became over enthusiastic and missed this important detail. I edited the code so that the cancellation is observed before executing the side-effects. I think that now should work as expected. The Wait will block the current thread until all processing has completed.Abisha
If you move down Console.WriteLine("Job terminated."); to the very end, your answer works perfectly! I marked your answer as accepted, because it stays within Reactive, which I think is better than using the ManualResetEvent as in my original solution. Thanks for the help!Manutius
E
2

If I understand the question correctly, this isn't an Rx problem, this is a 'Whatever you are you doing in the Subscribe' problem. Your subscribe action takes half a second, with the possibility of a cleanup taking another half a second, and your job termination takes micro-seconds. What is it that you're hoping to squeeze in between cancellation and termination?

The best advice I can give you is to have the subscribe action honor the cancellation token better than the Thread.Sleep calls do.

Emulous answered 20/7, 2017 at 0:33 Comment(3)
I am hoping for some kind of mechanism that allows the application to wait for the subscription's OnNext-handler to complete its work.Manutius
@Manutius - Use a 1WaitHandle`.Wormy
@Manutius - Use a 1WaitHandle`.Wormy
M
2

Using the answer to a similar question together with the answer to a question about waiting before terminating, I figured out a solution that does what I want.

My original problem was that I found no way to wait for the subscription's thread. The answers linked above lead me to refactoring the code in three ways:

  1. I moved the cancellation-logic away from the subscription into the observable.

  2. The subscription is wrapped in its own Task (so the execution can continue to the ReadLine-statement).

  3. A ManualResetEvent was introduced to control the application exit strategy.

Solution:

var reset = new ManualResetEvent(false);

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250))
    .TakeWhile(x => !cts.Token.IsCancellationRequested)
    .Finally(
        () =>
            {
                Console.WriteLine("Finally: Beginning finalization.");
                Thread.Sleep(500);
                Console.WriteLine("Finally: Done with finalization.");
                reset.Set();
            });

await Task.Factory.StartNew(
    () => observable
        .Subscribe(
            value =>
                {
                    Console.WriteLine("Begin: {0}", value);
                    Thread.Sleep(2000);
                    Console.WriteLine("End: {0}", value);
                },
            () => Console.WriteLine("Completed: Subscription completed.")),
    TaskCreationOptions.LongRunning);

Console.ReadLine();
cts.Cancel();
reset.WaitOne();
Console.WriteLine("Job terminated.");

Output:

Begin: 0
End: 0
Begin: 1
Token cancelled.
End: 1
Completed: Subscription completed.
Finally: Beginning finalization.
Finally: Done with finalization.
Job terminated.

Being quite new to Reactive Extensions, I don't know if this is the best solution to my problem. But it is a great improvement to the example posted in the question, as it fulfills my requirements:

  • Each OnNext-action is allowed to run to completion.
  • The application waits for the stream-processing to complete (signalled by the ManualResetEvent).
  • The stream-cancellation logic is moved to the producer (instead of the consumer) in the TakeWhile-method.
  • The application termination logic is a reaction to the stream-cancellation in the producer's Finally-method.

This is a much nicer solution.

Manutius answered 20/7, 2017 at 9:30 Comment(1)
This solution has been working very well in production for years now. However, @Theodor-Zoulias has an answer that relies entirely on Reactive, which is a nicer solution.Manutius
A
2

Observables are (a)waitable. Subscriptions to observables are not awaitable. So if you want to wait your subscription code to complete, without resorting to artificial solutions like using ManualResetEvents, you should make your subscription code a side-effect of a derived observable, and (a)wait that observable. The example presented in your question has additional requirements, that complicate matters a bit, but not that much:

  1. You want to do other things between subscribing to the observable and awaiting it to complete (Console.ReadLine() etc).

  2. You want to terminate the observable when a CancellationToken is canceled.

Below is an example of how to address these requirements. It shows just one of the many available ways to solve this problem:

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250));

var withCancellation = observable
    .TakeUntil(Observable.Create<Unit>(observer =>
        cts.Token.Register(() => observer.OnNext(default))));

var withSideEffectsAndCancellation = withCancellation
    .Do(value =>
    {
        Console.WriteLine(value);
        Thread.Sleep(500);

        if (cts.Token.IsCancellationRequested)
        {
            Console.WriteLine("Cancellation detected on {0}.", value);
            Thread.Sleep(500);
            Console.WriteLine("Cleaning up done for {0}.", value);
        }
    }, () => Console.WriteLine("Completed"));

var hotWithSideEffectsAndCancellation = withSideEffectsAndCancellation
    .Publish()
    .AutoConnect(0);

Console.ReadLine();
cts.Cancel();

hotWithSideEffectsAndCancellation.DefaultIfEmpty().Wait();
// or await hotWithSideEffectsAndCancellation.DefaultIfEmpty();
Console.WriteLine("Job terminated.");

Explanation:

  1. The .TakeUntil...cts.Token.Register... is an idiomatic way to unsubscribe instantly from the Interval observable, when the cts.Token is canceled. It is copy-pasted from a relevant question. You could also use the simpler .TakeWhile(x => !cts.Token.IsCancellationRequested), provided that you are OK with a slightly less responsive cancellation.

  2. The Do operator is a natural way to perform the subscription side-effects, because it has the same parameters with the Subscribe method.

  3. The .Publish().AutoConnect(0); makes the sequence hot right away. The AutoConnect operator offers no way to disconnect from the underlying observable (as opposed to the RefCount operator), but in this particular case the disconnect functionality is not needed. The lifetime of the underlying observable is already controlled by the CancellationToken that we attached previously.

  4. The .DefaultIfEmpty() before the .Wait() is required in order to prevent an InvalidOperationException in the edge case that the sequence is canceled before producing any element. It is also required if you await asynchronously the sequence. These mechanisms to wait an observable (as well as others like the RunAsync and the ToTask operators) are returning the last value emitted by the observable, and they become frustrated when no such value exists.

Abisha answered 3/1, 2021 at 0:41 Comment(3)
Thanks for a very well explained answer! I appreciate you trying to stay within Reactive to solve the problem, but this solution still doesn't allow the Do-method to finish its work before terminating.Manutius
@Manutius yeap, you are right. I became over enthusiastic and missed this important detail. I edited the code so that the cancellation is observed before executing the side-effects. I think that now should work as expected. The Wait will block the current thread until all processing has completed.Abisha
If you move down Console.WriteLine("Job terminated."); to the very end, your answer works perfectly! I marked your answer as accepted, because it stays within Reactive, which I think is better than using the ManualResetEvent as in my original solution. Thanks for the help!Manutius

© 2022 - 2024 — McMap. All rights reserved.