Reactive Extensions: Concurrency within the subscriber
Asked Answered
M

1

17

I'm trying to wrap my head around Reactive Extensions' support for concurrency and am having a hard time getting the results I'm after. So I may not get it just yet.

I have a source that emits data into the stream faster than the subscriber can consume it. I'd prefer to configure the stream such that another thread is used to invoke the subscriber for each new item from the stream, so that the subscriber has multiple threads running through it concurrently. I am able to ensure the thread-safeness of the subscriber.

The following sample demonstrates the problem:

Observable.Interval( TimeSpan.FromSeconds(1))
    .Do( x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
                                DateTime.Now, 
                                Thread.CurrentThread.ManagedThreadId, x))
    .ObserveOn(NewThreadScheduler.Default)
    .Subscribe(x =>
               {
                   Console.WriteLine("{0} Thread: {1} Observed value: {2}",
                                     DateTime.Now,
                                     Thread.CurrentThread.ManagedThreadId, x);
                   Thread.Sleep(5000); // Simulate long work time
               });

The console output looks like this (dates removed):

4:25:20 PM Thread: 6 Source value: 0
4:25:20 PM Thread: 11 Observed value: 0
4:25:21 PM Thread: 12 Source value: 1
4:25:22 PM Thread: 12 Source value: 2
4:25:23 PM Thread: 6 Source value: 3
4:25:24 PM Thread: 6 Source value: 4
4:25:25 PM Thread: 11 Observed value: 1
4:25:25 PM Thread: 12 Source value: 5
4:25:26 PM Thread: 6 Source value: 6

Please notice the "Observed value" time deltas. The subscriber isn't invoked in parallel even though the source continues to emit data faster than the subscriber can process it. While I can imagine a bunch of scenarios where the current behavior would be useful, I need to be able to process the messages as soon as they become available.

I've tried several variations of Schedulers with the ObserveOn method, but none of them seem to do what I want.

Other than spinning off a thread within the Subscribe action to perform the long running work, is there anything I'm missing that will allow for concurrent delivery of data to the subscriber?

Thanks in advance for all answers and suggestions!

Marlie answered 20/5, 2013 at 21:45 Comment(1)
It's been a while since I last used RX, but wouldn't it be better to avoid manual thread handling? That is, use TPL to spawn a background task in the Subscribe method, which is then able to return immediately. As far as I can tell, this would solve your concurrency problems AND avoid the risk of spawning a lot of threads (which would otherwise happen if your source is faster than your subscribers).Jarrettjarrid
S
20

The fundamental problem here is that you want the Rx observable to dispatch events in a way that really breaks the rules of how observables work. I think it would be instructive to look at the Rx design guidelines here: http://go.microsoft.com/fwlink/?LinkID=205219 - most notably, "4.2 Assume observer instances are called in a serialized fashion". i.e. You're not meant to be able to run OnNext calls in parallel. In fact the ordering behaviour of Rx is pretty central to it's design philosophy.

If you look at the source, you'll see that Rx inforces this behaviour in the ScheduledObserver<T> class from which ObserveOnObserver<T> is derived... OnNexts are dispatched from an internal queue and each must complete before the next one is dispatched - within the given execution context. Rx won't allow an individual subscriber's OnNext calls to execute concurrently.

That's not to say you can't have multiple subscibers executing at different rates though. In fact this is easy to see if you change your code as follows:

var source = Observable.Interval(TimeSpan.FromSeconds(1))
    .Do(x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
                                DateTime.Now,
                                Thread.CurrentThread.ManagedThreadId, x))
    .ObserveOn(NewThreadScheduler.Default);

var subscription1 = source.Subscribe(x =>
    {
        Console.WriteLine("Subscriber 1: {0} Thread: {1} Observed value: {2}",
                            DateTime.Now,
                            Thread.CurrentThread.ManagedThreadId, x);
        Thread.Sleep(1000); // Simulate long work time
    });

var subscription2 = source.Subscribe(x =>
{
    Console.WriteLine("Subscriber 2: {0} Thread: {1} Observed value: {2}",
                        DateTime.Now,
                        Thread.CurrentThread.ManagedThreadId, x);
    Thread.Sleep(5000); // Simulate long work time
});

Now you'll see Subscriber 1 getting ahead of Subscriber 2.

What you can't easily do is ask an observable to do something like dispatch of an OnNext call to a "ready" subscriber - which is kind of what you are asking for in a roundabout way. I also presume you wouldn't really want to create a new thread for every OnNext in a slow consumer situation!

In this scenario it sounds like you might be better off with a single subscriber that does nothing other than push work onto a queue as fast as possible, which is in turn serviced by a number of consuming worker threads you could then control as necessary to keep pace.

Sudduth answered 21/5, 2013 at 0:8 Comment(5)
Thanks James. Your explanation makes perfect sense. Thanks for the pointer to the Design Guidelines... it covers ground that I haven't seen in books and blogs.Marlie
Welcome! As an aside, I wrote about an efficient approach for dropping events in order to allow a consumer to stay current here: zerobugbuild.com/?p=192 It could be relevant for some scenarios.Sudduth
You can also do stuff like source.Select(x => Observable.Defer(() => HandleAsync(x))).Merge(5).Subscribe(...) where HandleAsync(x) is a method which handles that one item and returns a Task to signal when it is done. This pattern is good when your source sometimes bursts faster than your subscriber can process. The above will observe up to 5 results concurrently. FYI Inside the Subscribe call, you are actually observing the results of the Tasks.Syllepsis
If you want to "fan out" you can use the old ..SelectMany(i=>Observable.Start(()=>DoSomething(i))..Pandurate
I stumbled onto this Q&A, but I was looking for exactly what @JamesWorld had written up. Here's a gist that with a modified version that uses dematerialized streams - gist.github.com/aniongithub/c650636189b68d0b1ece3e020bc51329Aloes

© 2022 - 2024 — McMap. All rights reserved.