The Observable.Repeat is unstoppable, is it a bug or a feature? [duplicate]
Asked Answered
S

1

0

I noticed something strange with the behavior of the Repeat operator, when the source observable's notifications are synchronous. The resulting observable cannot be stopped with a subsequent TakeWhile operator, and apparently continues running forever. For demonstration I created a source observable that produces a single value, which it is incremented on every subscription. The first subscriber gets the value 1, the second gets the value 2 etc:

int incrementalValue = 0;
var incremental = Observable.Create<int>(async o =>
{
    await Task.CompletedTask;
    //await Task.Yield();

    Thread.Sleep(100);
    var value = Interlocked.Increment(ref incrementalValue);
    o.OnNext(value);
    o.OnCompleted();
});

Then I attached the operators Repeat, TakeWhile and LastAsync to this observable, so that the program will wait until the composed observable produces its last value:

incremental.Repeat()
    .Do(new CustomObserver("Checkpoint A"))
    .TakeWhile(item => item <= 5)
    .Do(new CustomObserver("Checkpoint B"))
    .LastAsync()
    .Do(new CustomObserver("Checkpoint C"))
    .Wait();
Console.WriteLine($"Done");

class CustomObserver : IObserver<int>
{
    private readonly string _name;
    public CustomObserver(string name) => _name = name;
    public void OnNext(int value) => Console.WriteLine($"{_name}: {value}");
    public void OnError(Exception ex) => Console.WriteLine($"{_name}: {ex.Message}");
    public void OnCompleted() => Console.WriteLine($"{_name}: Completed");
}

Here is the output of this program:

Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Checkpoint A: 7
Checkpoint A: 8
Checkpoint A: 9
Checkpoint A: 10
Checkpoint A: 11
Checkpoint A: 12
Checkpoint A: 13
Checkpoint A: 14
Checkpoint A: 15
Checkpoint A: 16
Checkpoint A: 17
...

It never ends! Although the LastAsync has produced its value and has completed, the Repeat operator keeps spinning!

This happens only if the source observable notifies its subscribers synchronously. For example after uncommenting the line //await Task.Yield();, the program behaves as expected:

Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Done

The Repeat operator stops spinning, although it does not report completion (my guess is that it has been unsubscribed).

Is there any way to achieve consistent behavior from the Repeat operator, irrespective of the type of notifications it receives (sync or async)?

.NET Core 3.0, C# 8, System.Reactive 4.3.2, Console Application

Scorch answered 3/4, 2020 at 12:55 Comment(10)
I do keep saying that Observable.Create is bad...Decorticate
Well, it's more to do with the Scheduler.Immediate. Change the scheduler to incremental.ObserveOn(Scheduler.Default).Repeat() and see what happens.Decorticate
I suspect you're locking up the thread that's needed to unsubscribe.Decorticate
@Decorticate it's not related to Observable.Create. The problem persists with this implementation as well: var incremental = Observable.Defer(() => Observable.Return(++incrementalValue));Scorch
Yes, you're right. However Observable.Create makes it too easy to create observables with this problem.Decorticate
@Decorticate with ObserveOn(Scheduler.Default) the notifications are received from various threads, essentially mimicking my test with await Task.Yield() inside the producer. But I checked another option: ObserveOn(Scheduler.CurrentThread) that seems to solve cleanly the problem! With this option the program works as expected, and everything happens in a single thread. Are there any drawbacks with ObserveOn(Scheduler.CurrentThread) that I should be aware of?Scorch
The problem is also solved with SubscribeOn(Scheduler.CurrentThread) before Repeat, God knows why!Scorch
From memory Scheduler.CurrentThread uses the trampoline to schedule on the current thread (i.e. current context) so it isn't running immediately and unblocks the thread. Otherwise with Scheduler.Immediate you run the risk of deadlocks.Decorticate
@Decorticate that what I was guessing, but a Console application has no synchronization context, so it's puzzling.Scorch
You should look at the source for the System.Reactive.Concurrency namespace. That'll make your eye water. They do so many tricks in there to make Rx work. There's a sync context in there somewhere.Decorticate
M
1

You might expect an implementation of Repeat to feature the OnCompleted notification, but it turns it's implemented in terms of Concat-ing an infinite stream.

    public static IObservable<TSource> Repeat<TSource>(this IObservable<TSource> source) =>
        RepeatInfinite(source).Concat();

    private static IEnumerable<T> RepeatInfinite<T>(T value)
    {
        while (true)
        {
            yield return value;
        }
    }

With that responsibility shifted to Concat - we can create a simplified version (the gory implementation details are in TailRecursiveSink.cs). This still keeps on spinning unless there's a different execution context provided by await Task.Yield().

public static IObservable<T> ConcatEx<T>(this IEnumerable<IObservable<T>> enumerable) =>
    Observable.Create<T>(observer =>
    {
        var check = new BooleanDisposable();

        IDisposable loopRec(IScheduler inner, IEnumerator<IObservable<T>> enumerator)
        {
            if (check.IsDisposed)
                return Disposable.Empty;

            if (enumerator.MoveNext()) //this never returns false
                return enumerator.Current.Subscribe(
                    observer.OnNext,
                    () => inner.Schedule(enumerator, loopRec) //<-- starts next immediately
                );
            else
                return inner.Schedule(observer.OnCompleted); //this never runs
        }

        Scheduler.Immediate.Schedule(enumerable.GetEnumerator(), loopRec); //this runs forever
        return check;
    });

Being an infinite stream, enumerator.MoveNext() always returns true, so the other branch never runs - that's expected; it's not our problem.

When the o.OnCompleted() is called, it immediately schedules the next iterative loop in Schedule(enumerator, loopRec) which synchronously calls the next o.OnCompleted(), and it continues ad infinitum - there's no point where it can escape this recursion.

If you have a context switch with await Task.Yield(), then Schedule(enumerator, loopRec) exits immediately, and o.OnCompleted() is called non-synchronously.

Repeat and Concat use the current thread to do work without changing the context - that's not incorrect behavior, but when the same context is used to push notifications as well, it can lead to deadlocks or being caught in a perpetual trampoline.

Annotated Call Stack

[External Code] 
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code] 
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code] 
ConcatEx.AnonymousMethod__2() //inner.Schedule(enumerator, loopRec)
[External Code] 
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code] 
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code] 
ConcatEx.AnonymousMethod__0(observer) //Scheduler.Immediate.Schedule(...)
[External Code] 
Main(args) //incremental.RepeatEx()...
Malvina answered 7/4, 2020 at 7:20 Comment(9)
Thanks Asti. I couldn't hope for a more thorough answer!Scorch
@TheodorZoulias You're welcome! I used the experience from digging through the source for the other question.Malvina
Sorry I didn't attempt this earlier! It looked to me like some conclusion was reached in the comments.Malvina
Yes, we figured out the workaround of using Scheduler.CurrentThread, but what caused the default behavior was still a mystery. 😃Scorch
Ah, changing ` Scheduler.Immediate` to Scheduler.CurrentThread in the ConcatEx implementation fixes the problem.Malvina
But having Repeat run on Scheduler.CurrentThread might lead to a host of other problems. I mean in general.Malvina
Yeap, the other question about the problems of ToObservable().ToEnumerable().ToObservable() was quite revealing. The RX abstraction starts leaking if you push it too much!Scorch
Agree. Most of the times Rx is "don't worry about concurrency", but when you push anything to its limits, the law of leaky abstractions catches up.Malvina
If the mystery was solved, could you mark this as answered?Malvina

© 2022 - 2024 — McMap. All rights reserved.