What's the appropriate way to use schedulers in derived properties to have a responsive UI?
Asked Answered
M

2

5

I'm having a hard time trying to figure the proper way to schedule long-running reactive property "getters" in my ViewModel.

This excerpt from Intro to RX describes exactely what I want to do:

  • respond to some sort of user action
  • do work on a background thread
  • pass the result back to the UI thread
  • update the UI

Only in this case besides user interaction I want to react to change from other properties.

Below is the generic template I am using to get a derived property from an original property (in the actual code, there are chains of cascading derived properties).

In a Reactive ViewModel (inheriting from ReactiveObject) I already have some properties that derive from others. For exemple, when Original changes, Derived is recalculated.

    public TOriginal Original
    {
        get { return _original; }
        set { this.RaiseAndSetIfChanged(ref _original, value); }
    }
    TOriginal _original;


    public TDerived Derived { get { return _derived.Value; } }
    readonly ObservableAsPropertyHelper<double[,]> _derived;


    this.WhenAnyValue(x => x.Original)
        .Where(originalValue => originalValue != null)
        // ObserveOn? SubscribeOn? Which scheduler?
        .Select(derivedValue => LongRunningCalculation(originalValue))
        // Same thing here: ObserveOn? SubscribeOn? Which scheduler? 
        .ToProperty(this, x => x.Derived, out _derived); // should I use the `scheduler:` in this method?

My problems are: I have no idea how these different "design choices" should be combined to get my desired responsive UI:

  • Which scheduler to use? I have seen examples with RxApp.TaskpoolScheduler, RxApp.MainThreadScheduler, NewThreadScheduler.Default, and possibly others.
  • When to use SubscribeOn vs ObserveOn of even ObserveOnDispatcher or the scheduler: parameter of ToProperty?
  • Does the order make a difference? I have put the re-scheduling methods before and after the Select operator, but I'm not so sure. I'm not sure Select is even needed, to be frank.
  • I have seen some examples that set the Binding.IsAsync to true, but I tried it and haven't seem much difference, but again, maybe it is because of the other factors.
  • Are the concepts of SynchronizationContext and ThreadPriority relavant here? Is there a way to configure them in the code shown?
  • Should I be using ReactiveCommand or some other ReactiveUI class for this?

The most nerve-wrecking fact is that, with some combinations, the calculations work properly, but block the UI, while with some other combinations, the values are asynchronously calculated and the UI is a bit less blocked, but sometimes part of the derived values (in a collection of items, for example) are not available!

Sorry if I'm asking too much, but I didn't find any authoritative expected way to do what I need in the docs.

Motherwell answered 17/7, 2017 at 14:8 Comment(0)
S
8

Schedulers

In Rx.NET there are a few schedulers, including a special one that's exclusive to WPF.

  • TaskPoolScheduler runs code on the task pool. This is a bit like running code inside a Task.
  • NewThreadScheduler spawns a new thread to run the code on. Generally don't use this operator, unless you know that you "need" it (you almost never don't)
  • DispatcherScheduler runs code on the UI thread. Use this when you're going to set your properties in the VM

RxUI brings two platform agnostic scheduler abstractions. No matter what platform you're on (WPF, UWP, Xamarin.iOS, Xamarin.Android) RxApp.MainThreadScheduler will always refer to the UI thread scheduler, while RxApp.TaskPoolScheduler will refer to something akin to a background thread.

If you want to keep it simple, just use the RxApp schedulers; RxApp.MainThreadScheduler for UI stuff and RxApp.TaskPoolScheduler for background/heavy duty stuff.

ObserveOn/SubscribeOn

The name SubscribeOn() is a bit confusing as it doesn't directly affect Subscribe() method. SubscribeOn() decides which scheduler the observable will start on; on which scheduler the original/first subscription will be done (not which scheduler the Subscribe() method will execute on). I like to think that SubsribeOn() moves up the observable chain to the top and make sure the observable produces values on the given scheduler.

Some operators let's you specify which scheduler they should run on. When they do, you should always prefer to pass a scheduler, that way you know where they're going to do work and prevent them from potentially blocking the UI thead (although they shouldn't). SubsribeOn() is kind of a "hack" for observables that doesn't let you specify a scheduler. If you use SubscribeOn(), but the operator specifies a scheduler, the signals from the operator will be emitted on the operators scheduler, not the one you specified in SubscribeOn().

ObserveOn() does much the same as SubscribeOn(), but it does it "from this point onwards". The operators and code following ObserveOn() will execute on the scheduler given to ObserveOn(). I like to think that ObserveOn() means "change thread to this one".

Doing heavy work

If you are going to do heavy work, put that in a function and call that function, like what you've done with LongRunningCalculation(). You could use a put an ObserveOn(RxApp.TaskPoolScheduler) before the Select() and an ObserveOn(RxApp.MainThreadScheduler after it, but I prefer to use Observable.Start() combined with SelectMany().

Observable.Start() is basically Observable.Return() for functions: "Give me the result of this function as an observable." You can also specify the scheduler it should call the function on.

SelectMany() ensures that we get the result of the observable, instead of the observable itself. (It's kind of like the await for observables: "don't execute this next operator before we have the result of this observable")

Derived properties

You are doing a derived property correct.

Use WhenAnyValue() to get the changes of the property and pipe that to a ToProperty(). The operators you put in between may do work on background threads delay the setting of the derived property, but that's why we have INotifyPropertyChanged.

My take

Here's how I would implement your specific example:

public TOriginal Original
{
    get { return _original; }
    set { this.RaiseAndSetIfChanged(ref _original, value); }
}
TOriginal _original;


public TDerived Derived { get { return _derived.Value; } }
readonly ObservableAsPropertyHelper<double[,]> _derived;


_derived = this.WhenAnyValue(x => x.Original)
    .Where(originalValue => originalValue != null)
    // Sepcify the scheduler to the operator directly
    .SelectMany(originalValue =>
        Observable.Start(
            () => LongRunningCalculation(originalValue),
            RxApp.TaskPoolScheduler))
    .ObserveOn(RxApp.MainThreadScheduler)
    // I prefer this overload of ToProperty, which returns an ObservableAsPropertyHelper
    .ToProperty(this, x => x.Derived);

We have a Slack team for ReactiveUI which you are welcome to join. You can ask for an invite by clicking here

Supposing answered 17/7, 2017 at 19:51 Comment(3)
Thank you very very much for this answer, it was very enlightening.Motherwell
Generally nice answer except. I like to think that SubsribeOn() moves up the observable chain to the top and make sure the observable produces values on the given scheduler. I have given a set of test cases in my answer that shows in general this is not true even though it some cases it may appear so.Latrice
You're right. If the operator uses another scheduler or creates a new thread (as you show in your tests), it will signal on that scheduler/thread. I've updated my answer, but your tests are much more detailed.Kamalakamaria
L
4

Before a Select that might block the UI observe on the TaskPoolScheduler. Before the ToProperty observe on the MainThreadScheduler.

  this.WhenAnyValue(x => x.Original)
        .Where(originalValue => originalValue != null)
        .ObserveOn(TaskPoolScheduler.Default)
        .Select(derivedValue => LongRunningCalculation(originalValue))
        .ObserveOn(RxApp.MainThreadScheduler)
        .ToProperty(this, x => x.Derived, out _derived); 

Additionally

People are very confused as to what SubscribeOn actually does. There are many explanations. For example as given in another answer here

SubscribeOn moves up the observable chain to the top and make sure the observable produces values on the given scheduler

This is just not true. It is instructive to look at the implementation of SubscribeOn in the RX code base. You have to jump through several layers of abstraction to get there but eventually you find.

public static IObservable<TSource> 
    SubscribeOn<TSource>
   ( IObservable<TSource> source
   , IScheduler scheduler
   )
{
  if (source == null)
    throw new ArgumentNullException("source");
  if (scheduler == null)
    throw new ArgumentNullException("scheduler");
  return (IObservable<TSource>) new AnonymousObservable<TSource>((Func<IObserver<TSource>, IDisposable>) (observer =>
  {
    SingleAssignmentDisposable assignmentDisposable = new SingleAssignmentDisposable();
    SerialDisposable d = new SerialDisposable();
    d.Disposable = (IDisposable) assignmentDisposable;
    assignmentDisposable.Disposable = scheduler.Schedule((Action) (() => d.Disposable = (IDisposable) new ScheduledDisposable(scheduler, source.SubscribeSafe<TSource>(observer))));
    return (IDisposable) d;
  }));
}

The only thing this does is ensure that the Subscribe method on source is called on the specified scheduler and that the Dispose method on the disposable returned by the same Subscribe method is also called on the specified scheduler. The effect that this has on the downstream code is varied.

For example

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace SubscribeOnVsObserveOn
{
    class Program
    {
        static readonly Subject<object> EventsSubject = new Subject<object>();

        private static readonly IObservable<object> Events = Observable.Create<object>
            ( observer =>
            {
                Info( "Subscribing"  );
                return EventsSubject.Subscribe( observer );
            } );

        public static void Info(string msg)
        {
            var currentThread = Thread.CurrentThread;
            var currentThreadName = string.IsNullOrWhiteSpace( currentThread.Name ) ? "<no name>" : currentThread.Name;
            Console.WriteLine
                ( $"Thread Id {currentThread.ManagedThreadId} {currentThreadName} - " + msg );
        }

        public static void  Foo()
        {
            Thread.CurrentThread.Name = "Main Thread";

            Info( "Starting"  );

            void OnNext(object o) => Info( $"Received {o}" );

            void Notify(object obj)
            {
                Info( $"Sending {obj}"  );
                EventsSubject.OnNext( obj );
            }

            void StartAndSend(object o, string threadName)
            {
                var thread = new Thread(Notify);
                thread.Name = threadName;
                thread.Start(o);
                thread.Join();
            }

            Notify(1);

            Console.WriteLine("=============================================" );
            Console.WriteLine("Subscribe Only" );
            Console.WriteLine("=============================================" );
            using (Events.Subscribe(OnNext))
            {
                Thread.Sleep( 200 );
                StartAndSend(2, "A");
                StartAndSend(3, "B");
            }

            Console.WriteLine("=============================================" );
            Console.WriteLine("Subscribe With SubscribeOn(CurrentThreadScheduler)" );
            Console.WriteLine("=============================================" );
            using (Events.SubscribeOn( CurrentThreadScheduler.Instance ).Subscribe(OnNext))
            {
                Thread.Sleep( 200 );
                StartAndSend(2, "A");
                StartAndSend(3, "B");
            }

            Console.WriteLine("=============================================" );
            Console.WriteLine("Subscribe With SubscribeOn(ThreadPool)" );
            Console.WriteLine("=============================================" );
            using (Events.SubscribeOn( ThreadPoolScheduler.Instance ).Subscribe(OnNext))
            {
                Thread.Sleep( 200 );
                StartAndSend(2, "A");
                StartAndSend(3, "B");
            }

            Console.WriteLine("=============================================" );
            Console.WriteLine("Subscribe With SubscribeOn(NewThread)" );
            Console.WriteLine("=============================================" );
            using (Events.SubscribeOn( NewThreadScheduler.Default ).Subscribe(OnNext))
            {
                Thread.Sleep( 200 );
                StartAndSend(2, "A");
                StartAndSend(3, "B");
            }

            Console.WriteLine("=============================================" );
            Console.WriteLine("Subscribe With SubscribeOn(NewThread) + ObserveOn" );
            Console.WriteLine("=============================================" );
            using (Events.SubscribeOn( NewThreadScheduler.Default ).ObserveOn(TaskPoolScheduler.Default  ).Subscribe(OnNext))
            {
                Thread.Sleep( 200 );
                StartAndSend(2, "A");
                StartAndSend(3, "B");
            }
        }




        static void Main(string[] args)
        {
            Foo();
            Console.WriteLine( "Press Any Key" );
            Console.ReadLine();
        }
    }
}

generates the following output

Thread Id 1 Main Thread - Starting
Thread Id 1 Main Thread - Sending 1
=============================================
Subscribe Only
=============================================
Thread Id 1 Main Thread - Subscribing
Thread Id 4 A - Sending 2
Thread Id 4 A - Received 2
Thread Id 5 B - Sending 3
Thread Id 5 B - Received 3
=============================================
Subscribe With SubscribeOn(CurrentThreadScheduler)
=============================================
Thread Id 1 Main Thread - Subscribing
Thread Id 6 A - Sending 2
Thread Id 6 A - Received 2
Thread Id 7 B - Sending 3
Thread Id 7 B - Received 3
=============================================
Subscribe With SubscribeOn(ThreadPool)
=============================================
Thread Id 8 <no name> - Subscribing
Thread Id 10 A - Sending 2
Thread Id 10 A - Received 2
Thread Id 11 B - Sending 3
Thread Id 11 B - Received 3
=============================================
Subscribe With SubscribeOn(NewThread)
=============================================
Thread Id 12 <no name> - Subscribing
Thread Id 13 A - Sending 2
Thread Id 13 A - Received 2
Thread Id 14 B - Sending 3
Thread Id 14 B - Received 3
=============================================
Subscribe With SubscribeOn(NewThread) + ObserveOn
=============================================
Thread Id 16 <no name> - Subscribing
Thread Id 17 A - Sending 2
Thread Id 19 B - Sending 3
Thread Id 18 <no name> - Received 2
Thread Id 18 <no name> - Received 3
Press Any Key

The takeaway being that SubscribeOn can neither force the sending or receiving of events to be on a specific scheduler. It can only force the Subscribe method to occur on a specific scheduler. This may or may not have downstream / upstream effects.

Latrice answered 17/7, 2017 at 14:46 Comment(19)
Wouldn't it be better instead of the first ObserveOn use an SubscribeOn to push it on a different thread?Heyday
SubscribeOn is rarely used. It only makes sense when the act of subscribing does something weird before the first event is received. Mostly you only use ObserveOn. That being said #7579737 for more info.Latrice
Using SubscribeOn both before and after Select made the UI work quite like I would expect. But using ObserveOn like originally suggested blocked the UI.Motherwell
You can always check which thread your work is occuring on. Set a breakpoint and then check the threads window. If your work is still on the main thread then you have done something wrong. You should post your exact code you tried.Latrice
Regarding "solving my problem" vs "understanding what is happening", even though I got a good responsiveness with the SubscribeOn alternative, I am feel that I'm not better than before.Motherwell
With both alternatives I get the work done in WorkerThread, but with ObserveOn I get the UI blocked and when I break there are a lot more simultaneous WorkerThreads running. It almost feels like the solution that the more parallelism/concurrency I get, the more the UI feels frozen until everything gets processed.Motherwell
Sounds like your property is changing faster than the calculation can process results. You might want to try Throttle or Sample. If you don't need results at the rate the input is changing.Latrice
If you do need all the results and the overhead of task switching is too high a the rate you are generating data then you could also try batching the input. Use Buffer or Window for that.Latrice
Throttle seems ideal for me, where do you suggest to put it? At the beginning of the chain?Motherwell
Yes. But be aware if your data changes at a constant rate throttle will never return a result if the throttle timeout is greater than the data period and will always return results if it is less than the data period. Throttle is for burst data, like when a user starts typing a search and you wait for the keyboard to go quiet. Sample is better for constant rate data.Latrice
Sorry to correct you @Latrice SubscribeOn defines on which Scheduler the Pipeline should be processed on where ObserveOn defines the Scheduler the actual Subscription is executed. The naming is counter intuitive.Heyday
As I said. SubscribeOn is the scheduler where "subscriptions" occur. ObserveOn is the scheduler where output is deliveredLatrice
@Motherwell I noticed you asked a new question based on my answer. You should accept this one if it helped you progress. :)Latrice
@Latrice still your example above using ObserveOn won't help at all you have to use SubscribeOn at this place.Heyday
The long running calculation will occur on the threadpool and the results will be on the main thread. Try it if you don't believe it.Latrice
#20452439Latrice
Thank you very much for this answer, but I found the explanation on the other answer more complete.Motherwell
Except the answer he gave with SubscribeOn is wrong. People are always confused by SubscribeOn but don't actually check the source code to see what it does. It does not move up the observable chain to the top and make sure the observable produces values on the given scheduler. The only thing it does is dispatch the call to Subscribe and Dispose onto another scheduler. Nothing more than that. The effect that this has really depends on what the IObservable directly upstream is doing. I will update my answer with the source code of SubscribeOn so that people can be clear.Latrice
I've updated my answer with a very detailed set of examples showing how SubscribeOn works.Latrice

© 2022 - 2024 — McMap. All rights reserved.