How to Separate IObservable and IObserver
Asked Answered
A

3

6

Update: check out the example at the bottom

I need to message between classes. The publisher will loop indefinitely, call some method to get data, and then pass the result of that call into OnNext. There can be many subscribers, but there should only ever be one IObservable, and one long-running task. Here is an implementation.

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    [TestClass]
    public class UnitTest1
    {
        private static string GetSomeData() => "Hi";

        [TestMethod]
        public async Task RunMessagingAsync()
        {
            var subject = new Subject<string>();

            //Create a class and inject the subject as IObserver
            new Publisher(subject);

            //Create a class and inject the subject as IObservable
            new Subscriber(subject, 1.ToString());
            new Subscriber(subject, 2.ToString());
            new Subscriber(subject, 3.ToString());

            //Run the loop for 3 seconds
            await Task.Delay(3000);
        }

        class Publisher
        {
            public Publisher(IObserver<string> observer)
            {
                Task.Run(async () =>
                {
                    //Loop forever
                    while (true)
                    {
                        //Get some data, publish it with OnNext and wait 500 milliseconds
                        observer.OnNext(GetSomeData());
                        await Task.Delay(500);
                    }
                });
            }
        }

        class Subscriber
        {
            public string Name;

            //Listen for OnNext and write to the debug window when it happens
            public Subscriber(IObservable<string> observable, string name)
            {
                Name = name;
                var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
            }
        }
    }
}

Output:

Name: 1 Message: Hi

Name: 2 Message: Hi

Name: 3 Message: Hi

Name: 1 Message: Hi

Name: 2 Message: Hi

Name: 3 Message: Hi

This works fine. Notice that only one IObserver sends messages, but all subscriptions pick up the message. But, how do I separate the IObservable and the IObserver ? They are glued together as a Subject. Here is another approach.

[TestMethod]
public async Task RunMessagingAsync2()
{
    var observers = new List<IObserver<string>>();

    var observable = Observable.Create(
    (IObserver<string> observer) =>
    {
        observers.Add(observer);

        Task.Run(async () =>
        {
            while (true)
            {
                try
                {
                    observer.OnNext(GetSomeData());
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }

                await Task.Delay(500);
            }
        });

        return Disposable.Create(() => { });
    });

    //Create a class and inject the subject as IObservable
    new Subscriber(observable);
    new Subscriber(observable);

    //Run the loop for 10 seconds
    await Task.Delay(10000);

    Assert.IsTrue(ReferenceEquals(observers[0], observers[1]));
}

The problem here is that this creates two separate Tasks and two separate IObservers. Every subscription creates a new IObserver. You can confirm that because the Assert here fails. This doesn't really make any sense to me. From what I understand of Reactive programming, I wouldn't expect the Subscribe method here to create a new IObserver each time. Check out this gist. It is a slight modification of the Observable.Create example. It shows how the Subscribe method causes an IObserver to be created each time it is called. How can I achieve the functionality from the first example without using a Subject?

Here is another approach that does not use Reactive UI at all... You could create a Subject from the publisher if you want to, but it is not necessary.

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    [TestClass]
    public class UnitTest1
    {
        private static string GetSomeData() => "Hi";
   
        class Publisher
        {
            public Publisher(Action<string> onNext)
            {
                Task.Run(async () =>
                {
                    //Loop forever
                    while (true)
                    {
                        //Get some data, publish it with OnNext and wait 500 milliseconds
                        onNext(GetSomeData());
                        await Task.Delay(500);
                    }
                });
            }
        }

        class Subscriber
        {
            //Listen for OnNext and write to the debug window when it happens
            public void ReceiveMessage(string message) => Debug.WriteLine(message);
        }

        [TestMethod]
        public async Task RunMessagingAsync()
        {
            //Create a class and inject the subject as IObservable
            var subscriber = new Subscriber();

            //Create a class and inject the subject as IObserver
            new Publisher(subscriber.ReceiveMessage);

            //Run the loop for 10 seconds
            await Task.Delay(10000);
        }
    }
}

Lastly, I should add that ReactiveUI used to have a MessageBus class. I'm not sure if it got removed or not, but it is no longer recommended. What do they suggest we use instead?

Working Example

This version is correct. I guess the only thing I'm asking now is how do I do the equivalent of this with Observable.Create? The problem with Observable.Create is that it runs the action for each subscription. That is not the intended functionality. The long running task here only runs once no matter how many subscriptions there are.

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    class Subscriber
    {
        public string Name;

        //Listen for OnNext and write to the debug window when it happens
        public Subscriber(IObservable<string> observable, string name)
        {
            Name = name;
            var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
        }
    }

    internal class BasicObservable<T> : IObservable<T>
    {
        List<IObserver<T>> _observers = new List<IObserver<T>>();

        public BasicObservable(
            Func<T> getData,
            TimeSpan? interval = null,
            CancellationToken cancellationToken = default
            ) =>

            Task.Run(async () =>
            {
                while (!cancellationToken.IsCancellationRequested)
                {
                    try
                    {
                        await Task.Delay(interval ?? new TimeSpan(0, 0, 1));
                        var data = getData();
                        _observers.ForEach(o => o.OnNext(data));
                    }
                    catch (Exception ex)
                    {
                        _observers.ForEach(o => o.OnError(ex));
                    }
                }

                _observers.ForEach(o => o.OnCompleted());

            }, cancellationToken);

        public IDisposable Subscribe(IObserver<T> observer)
        {
            _observers.Add(observer);
            return Disposable.Create(observer, (o) => _observers.Remove(o));
        }
    }

    public static class ObservableExtensions
    {
        public static IObservable<T> CreateObservable<T>(
            this Func<T> getData,
            CancellationToken cancellationToken = default)
        => new BasicObservable<T>(getData, default, cancellationToken);

        public static IObservable<T> CreateObservable<T>(
            this Func<T> getData,
            TimeSpan? interval = null,
            CancellationToken cancellationToken = default)
        => new BasicObservable<T>(getData, interval, cancellationToken);
    }

    [TestClass]
    public class UnitTest1
    {
        string GetData() => "Hi";

        [TestMethod]
        public async Task Messaging()
        {
            var cancellationSource = new CancellationTokenSource();
            var cancellationToken = cancellationSource.Token;

            Func<string> getData = GetData;

            var publisher = getData.CreateObservable(cancellationToken);

            new Subscriber(publisher, "One");
            new Subscriber(publisher, "Two");

            for (var i = 0; true; i++)
            {
                if (i >= 5)
                {
                    cancellationSource.Cancel();
                }

                await Task.Delay(1000);
            }
        }
    }

}
Alit answered 23/10, 2020 at 23:14 Comment(20)
Is it an option to use AsObservable() to "separate" the components by hiding the source?Roam
Can you point me to an example usage? This works new Subscriber(subject.AsObservable()); but I don't see how it solves my problem.Alit
You use AsObservable() on the Subject, it returns a IObservable you can use normally, but you don't have "access" to the original source. But why is using a Subject directly an issue, you work against the IObservable interface anyway?Roam
AsObservable() doesn't really achieve anything other than hiding the identity of the original. "But why is using a Subject directly an issue" Turn that question around. I have used the pattern above with Subject for messaging. Is this the recommended approach? I thought we should separate the Observer and the ObservableAlit
If I am just going to smash the two things together, why bother with Rx UI in the first place?I could just directly hook the two classes together externally.Alit
I've edited the original post to include an example that doesn't require ReactiveUI. In my opinion, this design is better. Why would I use Reactive UI if I can do it like this?Alit
Your assertion that only one IObserver is created regardless of how many subscribers there are, is probably false. Each time you call the Subscribe method with a Action<T> onNext lambda as argument, a new AnonymousObserver<T> is created implicitly behind the scenes. This Subscribe is a utility extension method, to make subscribing easier.Bid
I didn't make that assertion. Infact, that's what I am pointing out. You can see the manifestation of this in my second example.Alit
Why does Observable.Subscribe create multiple observers behind the scenes? As an outsider here, I wouldn't expect the number of subscriptions to affect the number of IObservers in existence. There might be some cases where you would need that, but it seems strange to do it by default. It doesn't seem like the way most people would want to use the pattern.Alit
"Notice that only one IObserver is created regardless of how many subscribers there are." <=== I am talking about this assertion. The bottom line is that each subscription is associated with a different observer. Subscribing with the same observer multiple times doesn't make sense. What should happen with the OnError and OnCompleted notifications in this case? Would you like to see a single observer receiving multiple OnCompleted notifications? This goes against the RX conventions.Bid
I apologize. You are correct. Subscribe does create a new IObserver each time Subscribe() is called - even on a Subject. However, in my first example, one IObserver controls all subscriptions. That is the difference. That is the functionalty I need to achieve.Alit
In the first example, one Subject controls all subscriptions, but the subscriptions are associated with the IObservable nature of the Subject, not its IObserver nature. Maybe you are confused with the dual nature of the Subject<T> class?Bid
The first example works well. It get the functionality I need. But, it probably doesn't fit with RX UI pattern. I feel like I'm trying to push a square peg in to a round hole here. Should I even be trying to use RX UI for this? Forget RX UI for this minute... This is really about the pub/sub pattern. I want to publish messages, and then multiple subscribers should be able to pick up that message. The same could be done with plain old events and event handlers in C#. However, I'm now trying to achieve the same with RX UI.Alit
I am not familiar with the RX UI. Also I have not understood what you are trying to achieve. Do you want the functionality of the Subject without using explicitly a Subject? In that case you should look at the Publish, and Replay operators (that use different ISubject flavors behind the scenes).Bid
I had a look at these methods but I can't find a good example anywhere. Can you point me to one? Here is another example of what I am trying to achieve. gist.github.com/MelbourneDeveloper/… . It's not complicated. One publisher, multiple subscribers. Hopefully without having to implement these interfaces myself.Alit
And another example: gist.github.com/MelbourneDeveloper/… . Essentially, I just want to know how to use Observable.Create to create the same thing as this Publisher class.Alit
So if I understand correctly you want to know how to use the Observable.Create method in order to create observables and can be subscribed by multiple observers, and can send notifications to all of their subscribed observers. Correct?Bid
Yes, but also no run a loop for each observer. Check out the latest example at the bottom of the OP. It does what I want.Alit
"Why does Observable.Subscribe create multiple observers behind the scenes? ... It doesn't seem like the way most people would want to use the pattern." - it's not true to say that it creates multiple observers, rather it creates a new instance of each of the operators in the entire pipeline for each subscription. Now, that's exactly what the IEnumerable<T> extensions do. There's no difference with observables.Cassella
@ChristianFindlay - Theo's answer certainly is the correct way to handle what you're asking for, but I wanted to point out that you're doing a lot of dangerous things with your code as far as Rx is concerned. If you ever find yourself doing Observable.Create( with return Disposable.Create(() => { }); or return Disposable.Empty; you are probably doing something wrong. Likewise whenever you have a Task.Run(async () => and a while (true). And also when you implement your own IObservable<T>. I'm happy to chat and discuss.Cassella
B
5

At first you must familiarize yourself with the theory of "cold" and "hot" observables. Here is the definition from the Introduction to RX.

  1. Cold are sequences that are passive and start producing notifications on request (when subscribed to).
  2. Hot are sequences that are active and produce notifications regardless of subscriptions.

What you want is a hot observable, and the problem is that the Observable.Create method creates cold observables. But you can make any observable hot by using the Publish operator. This operator provides a way to have a single underlying subscription shared by multiple independent observers. Example:

int index = 0;
var coldObservable = Observable.Create<int>(observer =>
{
    _ = Task.Run(async () =>
    {
        while (true)
        {
            observer.OnNext(++index);
            await Task.Delay(1000);
        }
    });
    return Disposable.Empty;
});

IConnectableObservable<int> hotObservable = coldObservable.Publish();
hotObservable.Connect(); // Causes the start of the loop

hotObservable.Subscribe(s => Console.WriteLine($"Observer A received #{s}"));
hotObservable.Subscribe(s => Console.WriteLine($"Observer B received #{s}"));

The coldObservable created by the Observable.Create is subscribed when the hotObservable.Connect method is invoked, and then all notifications generated by that single subscription are propagated to all subscribers of the hotObservable.

Output:

Observer A received #1
Observer B received #1
Observer A received #2
Observer B received #2
Observer A received #3
Observer B received #3
Observer A received #4
Observer B received #4
Observer A received #5
Observer B received #5
Observer A received #6
Observer B received #6
...

Important: the purpose of the example above is to demonstrate the Publish operator, and not to serve as an example of good quality RX code. One of its problems is that by subscribing the observers after connecting to the source becomes theoretically possible that the first notification will not be send to some or all of the observers, because it may be created before their subscription. There is a race condition in other words.

There is an alternative method of managing the lifetime of an IConnectableObservable, the operator RefCount:

Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.

var hotObservable = coldObservable.Publish().RefCount();

This way you don't need to Connect manually. The connection occurs automatically with the first subscription, and it is disposed automatically with the last unsubscription.

Bid answered 24/10, 2020 at 8:45 Comment(9)
Brilliant, thanks! I have read about the difference between cold and hot before, but I hadn't drawn the dots here, and I wasn't able to find a good example of publish. Now, I understand how it works and I can see why having both options is a good thing. Most importantly, it means that I don't have to recreate the wheel when I publish the next version of my library. The reactive extensions work in the way I need them to.Alit
I'm going to write about this. What's your web or Twitter address so I can link back to you?Alit
@ChristianFindlay you could just link to my SO profile. 😃Bid
done! christianfindlay.com/2020/10/25/rx-hot-vs-cold. I would appreciate a critique. Happy to correct any mistakes. @theodor-zouliasAlit
@TheodorZoulias - You've correctly used the Publish to solve the OP's issue, but the Observable.Create you're using is terrible. Once subscribed to it can never end. You should never use a return Disposable.Empty; on an Observable.Create. In this case the observable Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0)).Select(x => (int)x) produces the same sequence, yet it behaves properly.Cassella
@ChristianFindlay bravo! Very nice article!Bid
@Cassella yeap, definitely. The code in my answer is intended to demonstrate a point, and is nowhere near being an example of robust and good quality RX code.Bid
@TheodorZoulias - Fair enough, but I'd suggest putting in a disclaimer on that method and a suggestion on the right way to do it.Cassella
@Cassella I added the disclaimer already. But I would prefer to avoid adding more code to the answer, since it is already quite long.Bid
C
2

I've added this as an answer because I feel that the code that Christian posted in his answer is dangerous as it's mixing Tasks and Rx and there are race conditions.

Here's an alternative that fixes most of these issues:

public class UnitTest1
{
    private string GetData() => "Hi";
    
    private IDisposable Subscriber(IObservable<string> observable, string name) =>
        observable.Subscribe(s => Debug.WriteLine($"Name: {name} Message: {s}"));
    
    public async Task Messaging()
    {
        var coldObservable =
            Observable
                .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0))
                .Select(_ => GetData());
                
        var publisher = coldObservable.Publish();

        var subscriptions =
            new CompositeDisposable(
                Subscriber(publisher, "One"),
                Subscriber(publisher, "Two"),
                publisher.Connect());

        await Task.Delay(TimeSpan.FromSeconds(5.0));

        subscriptions.Dispose();
    }
}

Better yet, though, I would look at doing it this way:

public class UnitTest1
{
    private string GetData() => "Hi";
    
    private IObservable<string> Subscriber(IObservable<string> observable, string name) =>
        observable.Select(s => $"Name: {name} Message: {s}");
    
    public async Task Messaging()
    {
        var coldObservable =
            Observable
                .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0))
                .Select(_ => GetData())
                .Do(_ => Debug.WriteLine("Called GetData()"))
                .Publish(published =>
                    Observable
                        .Merge(
                            Subscriber(published, "One"),
                            Subscriber(published, "Two")))
                .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))
                .Do(x => Debug.WriteLine(x));
    
        await coldObservable;
    }
}

It's always best to use the inbuilt operators for Rx rather than hybrid approaches with tasks.

Cassella answered 25/10, 2020 at 3:17 Comment(6)
Very nice. The trouble had been the the documentation doesn't explain this stuff very well. Hence mely article. I'll borrow this and update my article if that's ok.Alit
Question: what is the purpose of merging the subscribers? A new subscriber could come along at any point in timeAlit
Nice answer! I am a bit skeptical though about using the Merge operator to merge the notifications of multiple "subscribers" (IObservable<string>s), because it most probably adds synchronization overhead.Bid
@ChristianFindlay - I refectored your code because you're creating a test. I wanted the two subscriptions to be inlined in the query so that the final thing could be awaited.Cassella
@TheodorZoulias - Rx is just one big library of synchronization overhead. It's there to just make it (somewhat) trivial to write complex queries that run in parallel and are (mostly) thread-safe.Cassella
My understanding about the RX library, based on this document (dowloadable PDF), is that the RX operators trust their source observable to be already synchronized and well behaving, and don't add extra synchronization unless the nature of the specific operator requires it. And the Merge is most probably such an operator, because the notifications of its source observables can come from multiple threads concurrently.Bid
A
1

Thanks to the answer above, I eventually got the desired result without having to implement IObservable. Theodor was correct. The answer was to convert the IObservable to hot with the Publish() method.

I wrote an article about this here

While this works, Enigmativity's answer above is far better.

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Observables
{
    class Subscriber
    {
        public string Name;

        //Listen for OnNext and write to the debug window when it happens
        public Subscriber(IObservable<string> observable, string name)
        {
            Name = name;
            observable.Subscribe(s => Debug.WriteLine($"Name: {Name} Message: {s}"));
        }
    }

    [TestClass]
    public class UnitTest1
    {
        static string GetData() => "Hi";

        [TestMethod]
        public async Task Messaging()
        {
            var cancellationSource = new CancellationTokenSource();
            var cancellationToken = cancellationSource.Token;

            var coldObservable = Observable.Create<string>(observer =>
            {
                _ = Task.Run(async () =>
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {
                        var data = GetData();
                        observer.OnNext(data);
                        await Task.Delay(1000);
                    }
                }, cancellationToken);

                return Disposable.Empty;
            });

            var publisher = coldObservable.Publish();
            var connection = publisher.Connect();

            new Subscriber(publisher, "One");
            new Subscriber(publisher, "Two");

            for (var i = 0; i < 5; i++)
            {
                if (i == 4)
                {
                    cancellationSource.Cancel();
                }

                await Task.Delay(1000);
            }

            connection.Dispose();
        }
    }
}
Alit answered 24/10, 2020 at 9:9 Comment(4)
You've really mixed up Tasks and observables here. This code can be written in a more Rx friendly and safe way.Cassella
You're also connecting to your publisher before subscribing to it. You are likely to miss values.Cassella
@Cassella AFAIK tasks and RX can coexist harmoniously. There are multiple operators in the library that expect Task-related arguments or produce Task values. The ToTask operator for example.Bid
@TheodorZoulias - Moving to and from tasks can cause race conditions or deadlocks. It's best to try to avoid changing. Do remember that a Task is a future with a single value and an observable is a future with zero or more values - so they are not 100% compatible. I try to limit jumping back and forth.Cassella

© 2022 - 2024 — McMap. All rights reserved.