I'm trying to wrap my head around Reactive Extensions' support for concurrency and am having a hard time getting the results I'm after. So I may not get it just yet.
I have a source that emits data into the stream faster than the subscriber can consume it. I'd prefer to configure the stream such that another thread is used to invoke the subscriber for each new item from the stream, so that the subscriber has multiple threads running through it concurrently. I am able to ensure the thread-safeness of the subscriber.
The following sample demonstrates the problem:
Observable.Interval( TimeSpan.FromSeconds(1))
.Do( x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x))
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(x =>
{
Console.WriteLine("{0} Thread: {1} Observed value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x);
Thread.Sleep(5000); // Simulate long work time
});
The console output looks like this (dates removed):
4:25:20 PM Thread: 6 Source value: 0
4:25:20 PM Thread: 11 Observed value: 0
4:25:21 PM Thread: 12 Source value: 1
4:25:22 PM Thread: 12 Source value: 2
4:25:23 PM Thread: 6 Source value: 3
4:25:24 PM Thread: 6 Source value: 4
4:25:25 PM Thread: 11 Observed value: 1
4:25:25 PM Thread: 12 Source value: 5
4:25:26 PM Thread: 6 Source value: 6
Please notice the "Observed value" time deltas. The subscriber isn't invoked in parallel even though the source continues to emit data faster than the subscriber can process it. While I can imagine a bunch of scenarios where the current behavior would be useful, I need to be able to process the messages as soon as they become available.
I've tried several variations of Schedulers with the ObserveOn method, but none of them seem to do what I want.
Other than spinning off a thread within the Subscribe action to perform the long running work, is there anything I'm missing that will allow for concurrent delivery of data to the subscriber?
Thanks in advance for all answers and suggestions!