The difference between Rx Throttle(...).ObserveOn(scheduler) and Throttle(..., scheduler)
Asked Answered
T

3

6

I have the following code:

IDisposable subscription = myObservable.Throttle(TimeSpan.FromMilliseconds(50), RxApp.MainThreadScheduler)
                                       .Subscribe(_ => UpdateUi());

As expected, UpdateUi() will always execute on the main thread. When I change the code to

IDisposable subscription = myObservable.Throttle(TimeSpan.FromMilliseconds(50))
                                       .ObserveOn(RxApp.MainThreadScheduler)
                                       .Subscribe(_ => UpdateUi());

UpdateUI() will be executed in a background thread.

Why is not Throttle(...).ObserveOn(scheduler) equivalent to Throttle(..., scheduler)?

Tarnetgaronne answered 2/3, 2015 at 17:37 Comment(0)
S
5

In both examples in code you've given UpdateUi will always be invoked on the scheduler specified by RxApp.MainThreadScheduler. I can say this with some certainty since ObserveOn is a decorator that ensures the OnNext handler of subscribers is called on the specified scheduler. See here for an in-depth analysis.

So that said, this is a bit puzzling. Either RxApp.MainThreadScheduler is not referring to the correct dispatcher scheduler or UpdateUi is transitioning off the dispatcher thread. The former is not unprecedented - see https://github.com/reactiveui/ReactiveUI/issues/768 where others have run into this. I have no idea what the issue was in that case. Perhaps @PaulBetts can weigh in, or you could raise an issue at https://github.com/reactiveui/. Whatever the case, I would carefully check your assumptions here since I would expect this to be a well tested area. Do you have a complete repro?

As to your specific question, the difference between Throttle(...).ObserveOn(scheduler) and Throttle(..., scheduler) is as follows:

In the first case when Throttle is specified without a scheduler it will use the default platform scheduler to introduce the concurrency necessary to run it's timer - on WPF this would use a thread pool thread. So all the throttling will be done on a background thread and, due to the following ObserveOn the released events only will be passed to the subscriber on the specified scheduler.

In the case where Throttle specifies a scheduler, the throttling is done on that scheduler - both suppressed events and released events will be managed on that scheduler and the subscriber will be called on that same scheduler too.

So either way, the UpdateUi will be called on the RxApp.MainThreadScheduler.

You are best off throttling ui events on the dispatcher in most cases since it's generally more costly to run separate timers on a background thread and pay for the context switch if only a fraction of events are going to make it through the throttle.

So, just to check you haven't run into an issue with RxApp.MainThreadScheduler, I would try specifying the scheduler or SynchronizationContext explicitly via another means. How to do this will depend on the platform you are on - ObserveOnDispatcher() is hopefully available, or use a suitable ObserveOn overload. There are options for controls, syncronizationcontexts and schedulers given the correct Rx libraries are imported.

Surface answered 2/3, 2015 at 21:45 Comment(2)
Thank you, turns out there seems to be an issue with my MainThreadScheduler (see https://mcmap.net/q/1667017/-the-difference-between-rx-throttle-observeon-scheduler-and-throttle-scheduler)Tarnetgaronne
I've just found out, there's a bit more about the difference between the two.Cockshy
T
5

After some investigation I believe this is caused by a different version of Rx being used run time than I expect (I develop a plugin for a third-party application).

I'm not sure why, but it seems that the default RxApp.MainThreadScheduler fails to initialize correctly. The default instance is a WaitForDispatcherScheduler (source). All functions in this class rely attemptToCreateScheduler:

    IScheduler attemptToCreateScheduler()
    {
        if (_innerScheduler != null) return _innerScheduler;
        try {
            _innerScheduler = _schedulerFactory();
            return _innerScheduler;
        } catch (Exception) {
            // NB: Dispatcher's not ready yet. Keep using CurrentThread
            return CurrentThreadScheduler.Instance;
        }
    }

What seems to happen in my case is that _schedulerFactory() throws, resulting in CurrentThreadScheduler.Instance to be returned instead.

By manually initializing the RxApp.MainThreadScheduler to new SynchronizationContextScheduler(SynchronizationContext.Current) behavior is as expected.

Tarnetgaronne answered 3/3, 2015 at 9:15 Comment(0)
C
1

I've just bumped into an issue that first led me to this question and then to some experimenting.

It turns out, Throttle(timeSpan, scheduler) is clever enough to "cancel" an already scheduled debounced event X, in case the source emits another event Y before X gets observed. Thus, only Y will be eventually observed (provided it's the last debounced event).

With Throttle(timeSpan).ObserveOn(scheduler), both X and Y will be observed.

So, conceptually, that's an important difference between the two approaches. Sadly, Rx.NET docs are scarce, but I believe this behavior is by design and it makes sense to me.

To illustrate this with an example (fiddle):

#nullable enable
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using static System.Console;

public class Program
{
    static async Task ThrottleWithScheduler()
    {
        WriteLine($"\n{nameof(ThrottleWithScheduler)}\n");

        var sc = new CustomSyncContext();
        var scheduler = new SynchronizationContextScheduler(sc);
        var subj = new BehaviorSubject<string>("A");

        subj
            .Do(v => WriteLine($"Emitted {v} on {sc.Elapsed}ms"))
            .Throttle(TimeSpan.FromMilliseconds(500), scheduler)
            .Subscribe(v => WriteLine($"Observed {v} on {sc.Elapsed}ms"));

        await Task.Delay(100);
        subj.OnNext("B");
        await Task.Delay(200);
        subj.OnNext("X");
        await Task.Delay(550);
        subj.OnNext("Y");

        await Task.Delay(2000);
        WriteLine("Finished!");
    }

    static async Task ThrottleWithObserveOn()
    {
        WriteLine($"\n{nameof(ThrottleWithObserveOn)}\n");

        var sc = new CustomSyncContext();
        var scheduler = new SynchronizationContextScheduler(sc);
        var subj = new BehaviorSubject<string>("A");

        subj
            .Do(v => WriteLine($"Emitted {v} on {sc.Elapsed}ms"))
            .Throttle(TimeSpan.FromMilliseconds(500))
            .ObserveOn(scheduler)
            .Subscribe(v => WriteLine($"Observed {v} on {sc.Elapsed}ms"));

        await Task.Delay(100);
        subj.OnNext("B");
        await Task.Delay(200);
        subj.OnNext("X");
        await Task.Delay(550);
        subj.OnNext("Y");

        await Task.Delay(2000);
        WriteLine("Finished!");
    }

    public static async Task Main()
    {
        await ThrottleWithScheduler();
        await ThrottleWithObserveOn();
    }
}

class CustomSyncContext : SynchronizationContext
{
    private readonly Stopwatch _sw = Stopwatch.StartNew();
    public long Elapsed { get { lock (_sw) { return _sw.ElapsedMilliseconds; } } }
    public override void Post(SendOrPostCallback d, object? state)
    {
        WriteLine($"Scheduled on {Elapsed}ms");
        Task.Delay(100).ContinueWith(
            continuationAction: _ =>
            {
                WriteLine($"Executed on {Elapsed}ms");
                d(state);
            },
            continuationOptions: TaskContinuationOptions.ExecuteSynchronously);
    }
}

Output:

ThrottleWithScheduler

Emitted A on 18ms
Emitted B on 142ms
Emitted X on 351ms
Scheduled on 861ms
Emitted Y on 907ms
Executed on 972ms
Scheduled on 1421ms
Executed on 1536ms
Observed Y on 1539ms
Finished!

ThrottleWithObserveOn

Emitted A on 4ms
Emitted B on 113ms
Emitted X on 315ms
Scheduled on 837ms
Emitted Y on 886ms
Executed on 951ms
Observed X on 953ms
Scheduled on 1391ms
Executed on 1508ms
Observed Y on 1508ms
Finished!
Cockshy answered 7/11, 2021 at 22:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.