I noticed something strange with the behavior of the Repeat
operator, when the source observable's notifications are synchronous. The resulting observable cannot be stopped with a subsequent TakeWhile
operator, and apparently continues running forever. For demonstration I created a source observable that produces a single value, which it is incremented on every subscription. The first subscriber gets the value 1, the second gets the value 2 etc:
int incrementalValue = 0;
var incremental = Observable.Create<int>(async o =>
{
await Task.CompletedTask;
//await Task.Yield();
Thread.Sleep(100);
var value = Interlocked.Increment(ref incrementalValue);
o.OnNext(value);
o.OnCompleted();
});
Then I attached the operators Repeat
, TakeWhile
and LastAsync
to this observable, so that the program will wait until the composed observable produces its last value:
incremental.Repeat()
.Do(new CustomObserver("Checkpoint A"))
.TakeWhile(item => item <= 5)
.Do(new CustomObserver("Checkpoint B"))
.LastAsync()
.Do(new CustomObserver("Checkpoint C"))
.Wait();
Console.WriteLine($"Done");
class CustomObserver : IObserver<int>
{
private readonly string _name;
public CustomObserver(string name) => _name = name;
public void OnNext(int value) => Console.WriteLine($"{_name}: {value}");
public void OnError(Exception ex) => Console.WriteLine($"{_name}: {ex.Message}");
public void OnCompleted() => Console.WriteLine($"{_name}: Completed");
}
Here is the output of this program:
Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Checkpoint A: 7
Checkpoint A: 8
Checkpoint A: 9
Checkpoint A: 10
Checkpoint A: 11
Checkpoint A: 12
Checkpoint A: 13
Checkpoint A: 14
Checkpoint A: 15
Checkpoint A: 16
Checkpoint A: 17
...
It never ends! Although the LastAsync
has produced its value and has completed, the Repeat
operator keeps spinning!
This happens only if the source observable notifies its subscribers synchronously. For example after uncommenting the line //await Task.Yield();
, the program behaves as expected:
Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Done
The Repeat
operator stops spinning, although it does not report completion (my guess is that it has been unsubscribed).
Is there any way to achieve consistent behavior from the Repeat
operator, irrespective of the type of notifications it receives (sync or async)?
.NET Core 3.0, C# 8, System.Reactive 4.3.2, Console Application
Observable.Create
is bad... – DecorticateScheduler.Immediate
. Change the scheduler toincremental.ObserveOn(Scheduler.Default).Repeat()
and see what happens. – DecorticateObservable.Create
. The problem persists with this implementation as well:var incremental = Observable.Defer(() => Observable.Return(++incrementalValue));
– ScorchObservable.Create
makes it too easy to create observables with this problem. – DecorticateObserveOn(Scheduler.Default)
the notifications are received from various threads, essentially mimicking my test withawait Task.Yield()
inside the producer. But I checked another option:ObserveOn(Scheduler.CurrentThread)
that seems to solve cleanly the problem! With this option the program works as expected, and everything happens in a single thread. Are there any drawbacks withObserveOn(Scheduler.CurrentThread)
that I should be aware of? – ScorchSubscribeOn(Scheduler.CurrentThread)
beforeRepeat
, God knows why! – ScorchScheduler.CurrentThread
uses the trampoline to schedule on the current thread (i.e. current context) so it isn't running immediately and unblocks the thread. Otherwise withScheduler.Immediate
you run the risk of deadlocks. – DecorticateSystem.Reactive.Concurrency
namespace. That'll make your eye water. They do so many tricks in there to make Rx work. There's a sync context in there somewhere. – Decorticate