RxJS: How would I "manually" update an Observable?
Asked Answered
D

3

174

I think I must be misunderstanding something fundamental, because in my mind this should be the most basic case for an observable, but for the life of my I can't figure out how to do it from the docs.

Basically, I want to be able to do this:

// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
   function(x){
     console.log('next: ' + x);
   }
...
var my_function = function(){
  eventObservable.push('foo'); 
  //'push' adds an event to the datastream, the observer gets it and prints 
  // next: foo
}

But I have not been able to find a method like push. I'm using this for a click handler, and I know they have Observable.fromEvent for that, but I'm trying to use it with React and I'd rather be able to simply update the datastream in a callback, instead of using a completely different event handling system. So basically I want this:

$( "#target" ).click(function(e) {
  eventObservable.push(e.target.text()); 
});

The closest I got was using observer.onNext('foo'), but that didn't seem to actually work and that's called on the observer, which doesn't seem right. The observer should be the thing reacting to the data stream, not changing it, right?

Do I just not understand the observer/observable relationship?

Drone answered 24/10, 2015 at 22:57 Comment(3)
Have a look at this to clarify your idea (The introduction to Reactive Programming you've been missing) : gist.github.com/staltz/868e7e9bc2a7b8c1f754. Here too there is a bunch of resources from which you can improve your understanding : github.com/Reactive-Extensions/RxJS#resourcesPotamic
I'd checked out the first, seems like a solid resource. The second one is a great list, on it I found aaronstacy.com/writings/reactive-programming-and-mvc which helped me discover Rx.Subject, which solves my problem. So thanks! Once I've written a bit more app I'll post my solution, just want to battle test it a bit.Drone
Hehe, thank you very much for asking this question, I was about to ask the very same question with the very same code sample in mind :-)Mersey
M
176

In RX, Observer and Observable are distinct entities. An observer subscribes to an Observable. An Observable emits items to its observers by calling the observers' methods. If you need to call the observer methods outside the scope of Observable.create() you can use a Subject, which is a proxy that acts as an observer and Observable at the same time.

You can do like this:

var eventStream = new Rx.Subject();

var subscription = eventStream.subscribe(
   function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

var my_function = function() {
  eventStream.next('foo'); 
}

You can find more information about subjects here:

Mer answered 26/10, 2015 at 21:48 Comment(3)
This is actually exactly what I wound up doing! I kept working on it to see if I could find a better way to do what I needed to do, but this is definitely a viable solution. I saw it first in this article: aaronstacy.com/writings/reactive-programming-and-mvc.Drone
How would one do it if they are not able to use Subjects and must use observables?Henning
@IanSteffy What if you create a new Observable which is created (via merge) from the old Observable and a new Subject so you can use this Subject to feed the new Observable?Prestidigitation
C
40

I believe Observable.create() does not take an observer as callback param but an emitter. So if you want to add a new value to your Observable try this instead:

var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = {
  next: function(next) {
    console.log(next);
  },
  error: function(error) {
    console.log(error);
  },
  complete: function() {
    console.log("done");
  }
}
observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();

//console output
//"foo"
//"bar"
//"baz"
//"done"

Yes Subject makes it easier, providing Observable and Observer in the same object, but it's not exactly the same, as Subject allows you to subscribe multiple observers to the same observable when an observable only send data to the last subscribed observer, so use it consciously. Here's a JsBin if you want to tinker with it.

Canfield answered 9/1, 2017 at 21:40 Comment(3)
is possibility of overwriting emitter property is documented somewhere on RxJS manuals?Portuna
In this case emitter will only next() new values for the observer that subscribed the last. A better approach would be to collect all emitters in an array and to iterate through them all and next the value on each of themGerminate
By what to replace the deprecated call Observable.create() then ? I tried a new Observable(emitter) but it's not behaving as I expected. https://mcmap.net/q/144626/-detecting-change-of-a-behaviorsubject-component-member-variable-updated-in-a-subscribe-block/958373Pasteurize
H
-1

var observer = Observable.subscribe(

function(x){

 console.log('next: ' + 

var my_function = function(){

Observable.push('hello')

One of the way to update an observable.

Hume answered 28/11, 2022 at 10:58 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.