Update: check out the example at the bottom
I need to message between classes. The publisher will loop indefinitely, call some method to get data, and then pass the result of that call into OnNext
. There can be many subscribers, but there should only ever be one IObservable, and one long-running task. Here is an implementation.
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
namespace UnitTestProject1
{
[TestClass]
public class UnitTest1
{
private static string GetSomeData() => "Hi";
[TestMethod]
public async Task RunMessagingAsync()
{
var subject = new Subject<string>();
//Create a class and inject the subject as IObserver
new Publisher(subject);
//Create a class and inject the subject as IObservable
new Subscriber(subject, 1.ToString());
new Subscriber(subject, 2.ToString());
new Subscriber(subject, 3.ToString());
//Run the loop for 3 seconds
await Task.Delay(3000);
}
class Publisher
{
public Publisher(IObserver<string> observer)
{
Task.Run(async () =>
{
//Loop forever
while (true)
{
//Get some data, publish it with OnNext and wait 500 milliseconds
observer.OnNext(GetSomeData());
await Task.Delay(500);
}
});
}
}
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable, string name)
{
Name = name;
var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
}
}
Output:
Name: 1 Message: Hi
Name: 2 Message: Hi
Name: 3 Message: Hi
Name: 1 Message: Hi
Name: 2 Message: Hi
Name: 3 Message: Hi
This works fine. Notice that only one IObserver
sends messages, but all subscriptions pick up the message. But, how do I separate the IObservable
and the IObserver
? They are glued together as a Subject
. Here is another approach.
[TestMethod]
public async Task RunMessagingAsync2()
{
var observers = new List<IObserver<string>>();
var observable = Observable.Create(
(IObserver<string> observer) =>
{
observers.Add(observer);
Task.Run(async () =>
{
while (true)
{
try
{
observer.OnNext(GetSomeData());
}
catch (Exception ex)
{
observer.OnError(ex);
}
await Task.Delay(500);
}
});
return Disposable.Create(() => { });
});
//Create a class and inject the subject as IObservable
new Subscriber(observable);
new Subscriber(observable);
//Run the loop for 10 seconds
await Task.Delay(10000);
Assert.IsTrue(ReferenceEquals(observers[0], observers[1]));
}
The problem here is that this creates two separate Task
s and two separate IObserver
s. Every subscription creates a new IObserver. You can confirm that because the Assert
here fails. This doesn't really make any sense to me. From what I understand of Reactive programming, I wouldn't expect the Subscribe
method here to create a new IObserver
each time. Check out this gist. It is a slight modification of the Observable.Create example. It shows how the Subscribe method causes an IObserver to be created each time it is called. How can I achieve the functionality from the first example without using a Subject
?
Here is another approach that does not use Reactive UI at all... You could create a Subject
from the publisher if you want to, but it is not necessary.
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace UnitTestProject1
{
[TestClass]
public class UnitTest1
{
private static string GetSomeData() => "Hi";
class Publisher
{
public Publisher(Action<string> onNext)
{
Task.Run(async () =>
{
//Loop forever
while (true)
{
//Get some data, publish it with OnNext and wait 500 milliseconds
onNext(GetSomeData());
await Task.Delay(500);
}
});
}
}
class Subscriber
{
//Listen for OnNext and write to the debug window when it happens
public void ReceiveMessage(string message) => Debug.WriteLine(message);
}
[TestMethod]
public async Task RunMessagingAsync()
{
//Create a class and inject the subject as IObservable
var subscriber = new Subscriber();
//Create a class and inject the subject as IObserver
new Publisher(subscriber.ReceiveMessage);
//Run the loop for 10 seconds
await Task.Delay(10000);
}
}
}
Lastly, I should add that ReactiveUI used to have a MessageBus class. I'm not sure if it got removed or not, but it is no longer recommended. What do they suggest we use instead?
Working Example
This version is correct. I guess the only thing I'm asking now is how do I do the equivalent of this with Observable.Create
? The problem with Observable.Create
is that it runs the action for each subscription. That is not the intended functionality. The long running task here only runs once no matter how many subscriptions there are.
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace UnitTestProject1
{
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable, string name)
{
Name = name;
var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
internal class BasicObservable<T> : IObservable<T>
{
List<IObserver<T>> _observers = new List<IObserver<T>>();
public BasicObservable(
Func<T> getData,
TimeSpan? interval = null,
CancellationToken cancellationToken = default
) =>
Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(interval ?? new TimeSpan(0, 0, 1));
var data = getData();
_observers.ForEach(o => o.OnNext(data));
}
catch (Exception ex)
{
_observers.ForEach(o => o.OnError(ex));
}
}
_observers.ForEach(o => o.OnCompleted());
}, cancellationToken);
public IDisposable Subscribe(IObserver<T> observer)
{
_observers.Add(observer);
return Disposable.Create(observer, (o) => _observers.Remove(o));
}
}
public static class ObservableExtensions
{
public static IObservable<T> CreateObservable<T>(
this Func<T> getData,
CancellationToken cancellationToken = default)
=> new BasicObservable<T>(getData, default, cancellationToken);
public static IObservable<T> CreateObservable<T>(
this Func<T> getData,
TimeSpan? interval = null,
CancellationToken cancellationToken = default)
=> new BasicObservable<T>(getData, interval, cancellationToken);
}
[TestClass]
public class UnitTest1
{
string GetData() => "Hi";
[TestMethod]
public async Task Messaging()
{
var cancellationSource = new CancellationTokenSource();
var cancellationToken = cancellationSource.Token;
Func<string> getData = GetData;
var publisher = getData.CreateObservable(cancellationToken);
new Subscriber(publisher, "One");
new Subscriber(publisher, "Two");
for (var i = 0; true; i++)
{
if (i >= 5)
{
cancellationSource.Cancel();
}
await Task.Delay(1000);
}
}
}
}
AsObservable()
to "separate" the components by hiding the source? – RoamAsObservable()
on theSubject
, it returns aIObservable
you can use normally, but you don't have "access" to the original source. But why is using aSubject
directly an issue, you work against theIObservable
interface anyway? – RoamAsObservable()
doesn't really achieve anything other than hiding the identity of the original. "But why is using a Subject directly an issue" Turn that question around. I have used the pattern above withSubject
for messaging. Is this the recommended approach? I thought we should separate theObserver
and theObservable
– AlitIObserver
is created regardless of how many subscribers there are, is probably false. Each time you call theSubscribe
method with aAction<T> onNext
lambda as argument, a newAnonymousObserver<T>
is created implicitly behind the scenes. ThisSubscribe
is a utility extension method, to make subscribing easier. – BidOnError
andOnCompleted
notifications in this case? Would you like to see a single observer receiving multipleOnCompleted
notifications? This goes against the RX conventions. – BidSubscribe()
is called - even on aSubject
. However, in my first example, one IObserver controls all subscriptions. That is the difference. That is the functionalty I need to achieve. – AlitSubject
controls all subscriptions, but the subscriptions are associated with theIObservable
nature of theSubject
, not itsIObserver
nature. Maybe you are confused with the dual nature of theSubject<T>
class? – BidSubject
without using explicitly aSubject
? In that case you should look at thePublish
, andReplay
operators (that use differentISubject
flavors behind the scenes). – BidObservable.Create
method in order to create observables and can be subscribed by multiple observers, and can send notifications to all of their subscribed observers. Correct? – BidIEnumerable<T>
extensions do. There's no difference with observables. – CassellaObservable.Create(
withreturn Disposable.Create(() => { });
orreturn Disposable.Empty;
you are probably doing something wrong. Likewise whenever you have aTask.Run(async () =>
and awhile (true)
. And also when you implement your ownIObservable<T>
. I'm happy to chat and discuss. – Cassella