Why is IEnumerable.ToObservable so slow?
Asked Answered
A

2

12

I am trying to enumerate a large IEnumerable once, and observe the enumeration with various operators attached (Count, Sum, Average etc). The obvious way is to transform it to an IObservable with the method ToObservable, and then subscribe an observer to it. I noticed that this is much slower than other methods, like doing a simple loop and notifying the observer on each iteration, or using the Observable.Create method instead of ToObservable. The difference is substantial: it's 20-30 times slower. It is what it is, or am I doing something wrong?

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 10_000_000;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

Output:

ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec

.NET Core 3.0, C# 8, System.Reactive 4.3.2, Windows 10, Console App, Release built


Update: Here is an example of the actual functionality I want to achieve:

var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");

Output:

Count: 10,000,000, Sum: 49,999,995,000,000, Average: 4,999,999.5

The important difference of this approach compared to using standard LINQ operators, is that the source enumerable is enumerated only once.


One more observation: using ToObservable(Scheduler.Immediate) is slightly faster (about 20%) than ToObservable().

Amok answered 2/4, 2020 at 8:42 Comment(17)
A 1-time measurement isn't all too reliable. Consider setting up a benchmark with BenchmarkDotNet for example. (Not affiliated)Waine
@Waine the BenchmarkDotNet is more useful for microbenchmarks, or for measuring small differences that occur with high variability. For this case a Stopwatch is probably enough.Amok
@TheodorZoulias There's more to it than that, for example, I would question your benchmark as it currently stands as the order of execution within that single run could be causing large differences.Meiosis
Stopwatch may be enough, if you gathered statistics. Not just a single sample.Waine
@Waine I just ran the test again with reverse order: Method3(COUNT); Method2(COUNT); Method1(COUNT);. I got similar results.Amok
I think initializtion of enumerable should be moved outside of the functions for the sake of accuracy.Felodese
I am not saying you will see a completely different result. It just will be more reliable.Waine
@Felodese Init is outside measurement. (If you are talking about the line var source = Enumerable.Range(0, count);)Waine
@Felodese I just tested you suggestion. I passed the same enumerable to all three methods. The results are similar.Amok
@Waine - The results are correct and they are expected.Trusty
@Trusty I am not saying they are INcorrect. I'd just roll a decent benchmark to be sure.Waine
@Waine I am not interested if the ToObservable is exactly 24.8 or 25.2 times slower. It doesn't make any difference for my use case. In both cases I am inclined not to use it, and use one of the other methods instead.Amok
@Waine - Fair enough. I mean that the figures are representative of what one should expect.Trusty
@TheodorZoulias - It would be a mistake not to use .ToObservable() for the reasons I outlined in my answer. Speed is not the goal here.Trusty
@Trusty Agreed.Waine
@TheodorZoulias - Nice question, btw.Trusty
@TheodorZoulias - This is also one of the reasons why I say avoid Observable.Create and especially so if you end up doing a return Disposable.Empty;.Trusty
T
9

This is the difference between a well behaved observable and a "roll-your-own-because-you-think-faster-is-better-but-it-is-not" observable.

When you dive down far enough in the source you discover this lovely little line:

scheduler.Schedule(this, (IScheduler innerScheduler, _ @this) => @this.LoopRec(innerScheduler));

The is effectively calling hasNext = enumerator.MoveNext(); once per scheduled recursive iteration.

This allows you to choose the scheduler for your .ToObservable(schedulerOfYourChoice) call.

With the other options you've chosen you've created a bare-to-the-bone series of calls to .OnNext that virtually do nothing. Method2 doesn't even has a .Subscribe call.

Both of Method2 and Method1 run using the current thread and both run to completion before the subscription is finished. They are blocking calls. They can cause race conditions.

Method1 is the only one that behaves nicely as an observable. It is asynchronous and it can run independently of the subscriber.

Do keep in mind that observables are collections that run over time. They typically have an async source or a timer or the respond to external stimulus. They don't often run off of a plain enumerable. If you're working with an enumerable then working synchronously should be expected to run faster.

Speed is not the goal of Rx. Performing complex queries on time-based, pushed values is the goal.

Trusty answered 2/4, 2020 at 9:5 Comment(15)
"roll-your-own-because-you-think-faster-is-better-but-it-is-not" - excellent!!Waine
Thanks Enigmativity for the detailed answer! I updated my question with an example of what I actually want to achieve, which is a calculation synchronous in nature. Do you think that instead of Reactive extensions I should search for another tool, given that performance is critical in my case?Amok
@TheodorZoulias - Here's the enumerable way to do your example in your question: source.Aggregate(new { count = 0, sum = 0L }, (a, x) => new { count = a.count + 1, sum = a.sum + x }, a => new { a.count, a.sum, average = (double)a.sum / a.count }). One iteration only and over 10x faster than Rx.Trusty
I just tested it, and it's faster indeed, but only about x2 faster (compared to RX without ToObservable). This is the other extreme, where I have the best performance but I am forced to re-implement every LINQ operator inside a complex lambda expression. It is error prone and less maintainable, considering that my actual calculations involve even more operators, and combinations of them. I think that it's quite tempting to pay a x2 performance price for having a clear and readable solution. On the other hand paying x10 or x20, not so much!Amok
Perhaps if you posted exactly what you're trying to do I could suggest an alternative?Trusty
TBH my actual case is in the realm of imagination for the time being. 😃 I am trying to understand which tools are more suitable for different scenarios, and the requirement for single enumeration seems to come up quite frequently.Amok
@TheodorZoulias - Keep in mind that the Rx way of having single iteration is causing multiple interations of the subjects. You haven't removed any iterations in practice at all.Trusty
Could you elaborate on that? My understanding is that a Subject is just a list of subscribers (observers), and everytime it gets a notification it propagates it to its subscribers. Is it more than this?Amok
@TheodorZoulias - No, that's exactly it. Say you're doing 4 calculations so you could iterate your enumerable 4 times to do it. Or you could attach 4 observers to a subject and iterate your enumerable once. Either way it is n * 4 values that you're running through - n times 4observers or n times 4 iterations.Trusty
I want to avoid enumerating the source more than once because the data may not be stored in memory. Instead could be fetched one at a time from the filesystem or a database. In this case each enumeration could even yield different results, making my computed values inconsistent.Amok
@TheodorZoulias - Then the observable approach is good. It is an ephemeral way to iterate through a list of items. You could easily create an observable directly from File.ReadLines or a DB query and never actually have the values in an enumerable in the first place.Trusty
Yeap, the File.ReadLines is exactly the method I had in mind. It returns an IEnumerable<string> though. So we are back to the question of how to make an IObservable out of an IEnumerable efficiently. :-)Amok
@TheodorZoulias - Try this way of reading a file with pure observables then: dotnetfiddle.net/PB9w9W.Trusty
I tried to test your Using-Defer-Repeat-TakeWhile solution, and ran into a strange behavior.Amok
I just tested it, after adding ObserveOn(Scheduler.CurrentThread) as a workaround for the aforementioned strange behavior, and the results aren't promising. It has five times more overhead than ToObservable, which is already slow.Amok
B
-1

Because the Subject do nothing .

It look like the peformance of the loop statement are different for 2 cases :

for(int i=0;i<1000000;i++)
    total++;

or

for(int i=0;i<1000000;i++)
    DoHeavyJob();

If use another Subject , with a slow OnNext implementation , the result will be more acceptable

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 100;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    class My_Slow_Subject : SubjectBase<int>
    {

        public override void OnNext(int value)
        {
            //do a job which spend 3ms
            System.Threading.Thread.Sleep(3);
        }


        bool _disposed;
        public override bool IsDisposed => _disposed;
        public override void Dispose() => _disposed = true;
        public override void OnCompleted() { }
        public override void OnError(Exception error) { }
        public override bool HasObservers => false;
        public override IDisposable Subscribe(IObserver<int> observer) 
                => throw new NotImplementedException();
    }

    static SubjectBase<int> CreateSubject()
    {
        return new My_Slow_Subject();
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

Output

ToObservable: 434 msec
Loop & Notify: 398 msec
Observable.Create: 394 msec

The ToObservable support System.Reactive.Concurrency.IScheduler

That means you can implement your own IScheduler and decide when to run each tasks

Hope this helps

Regards

Budge answered 2/4, 2020 at 9:28 Comment(2)
You do realize OP is talking explicitly about COUNT values 100,000x higher magnitude?Waine
Thanks BlazorPlus for the answer. I have updated my question adding a more realistic example of my use case. The subject is observed by other operators that perform calculations, so it's not doing nothing. The performance penalty of using ToObservable is still substantial, because the calculations are very light.Amok

© 2022 - 2024 — McMap. All rights reserved.