Schedulers: Immediate vs. CurrentThread
Asked Answered
T

1

12

After reading the explanation for why

Observable.Return(5)
  .Repeat()
  .Take(1)

never completes, but

Observable.Return(5, Scheduler.CurrentThread)
  .Repeat()
  .Take(1)

works as expected. I am still confused and I can't tell why CurrentThread actually solves the problem. Can somebody give a clear explanation?

Taphole answered 23/6, 2015 at 23:40 Comment(5)
I have tried both and they both complete just fine for me. Can you please post code that reproduces the problem?Sudderth
See this post.Discommode
@ Enigmativity the first one locks up for me in Linqpad, although it does print the valueDiscommode
@NedStoyanov - Both worked fine for me in LINQPad. Can you let me know what your test code looked like?Sudderth
@Sudderth using linqpad 4.55.03 with Rx-Main 2.2.5. The code is this: void Main() { Observable.Return(5).Repeat().Take(1).Subscribe(Console.WriteLine);}. It prints 5 but the Executing bar at the bottom keeps showingDiscommode
A
8

The link provided by Ned Stoyanov in the comments above has a great explanation by Dave Sexton.

I'll try to illustrate it a bit differently. Take this example where a recursive call occurs in the RecursiveMethod.

public class RecursiveTest()
{
    private bool _isDone;

    public void RecursiveMethod()
    {
        if (!_isDone)
        {
            RecursiveMethod();

           // Never gets here...
           _isDone = true;
        }
    }  
}

You can easily see that this will recurse indefinitely (until a StackOverflowException) because _isDone will never gets set to true. It is an overly simplified example, but it is basically what's going on with your first example.

This is the explanation by Dave Sexton to describe what happens in your first example.

By default, Return uses the ImmediateScheduler to call OnNext(1) then OnCompleted(). Repeat does not introduce any concurrency, so it sees OnCompleted immediately and then immediately resubscribes to Return. Because there's no trampoline in Return, this pattern repeats itself, blocking the current thread indefinitely. Calling Subscribe on this observable never returns.

In other words, because of the infinite loop of reentrancy, the initial flow never gets fully completed. So we need a way to complete the initial flow without the reentrancy.

Let's go back to my RecursiveTest example above in this post, what would be the solution to avoid infinite recursion? We would need the RecursiveMethod to complete its flow before executing again the RecursiveMethod. One way to do this is to have a queue and enqueue the call to the RecursiveMethod like this:

public void RecursiveMethod()
{
    if (!_isDone)
    {
        Enqueue(RecursiveMethod);
        _isDone = true;
    }
}  

This way, the initial flow would complete, _isDone would be set to true and when the next call to RecursiveMethod is executed, nothing will get executed anymore avoiding the infinite recursion. And this is pretty much what the Scheduler.CurrentThread will do to your second example.

Let's see how Dave Sexton explains how your second example works:

Here, Return is using the CurrentTheadScheduler to call OnNext(1) then OnCompleted(). Repeat does not introduce any concurrency, so it sees OnCompleted immediately and then immediately resubscribes to Return; however, this second subscription to Return schedules its (inner) actions on the trampoline because it's still executing on the OnCompleted callback from the first scheduled (outer) action, thus the repetition does not occur immediately. This allows Repeat to return a disposable to Take, which eventually calls OnCompleted, cancels the repetition by disposing Repeat, and ultimately the call from Subscribe returns.

Again my example was really simplified to make it easy to understand and it's not exactly how it works. Here you can see how the scheduler really works. It uses what they call a Trampoline which is basically a queue that makes sure that there is no reentrant calls. All calls are therefore serialized one after the other on the same thread. By doing so, the initial flow can be completed which avoids the infinite reentrant loop.

Hope this is a bit clearer :)

Atomy answered 24/6, 2015 at 20:48 Comment(3)
Thanks for this answer, it made it really clear for me.Taphole
link is broken: github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/…Petrify
Link is now fixedAtomy

© 2022 - 2024 — McMap. All rights reserved.