RxJS pushing the current value to late subscribers
Asked Answered
A

2

6

Following is an HTML snippet I'm using as an example:

<html>
<head>
    <script src="rx.all.js"></script>
</head>
<body>
    <script>
        var source = new Rx.BehaviorSubject(function() {return 2;});

        var stuff = source.scan([], function(val, operation) {
            return operation(val);
        });

        stuff.subscribe(function(v) {
            console.log("first subscriber");
            console.log(v);
        });

        source.onNext(function(val) {
            return val * 2;
        });

        stuff.subscribe(function(v) {
            console.log("second subscriber");
            console.log(v);
        });
    </script>
</body>

The output in the JS console is:

first subscriber
2
first subscriber
4
second subscriber
0 

Now, "stuff" does some processing (basically applying a function to the current value), an idea which I've taken from TodoMVC example for ReactJs + RxJS (https://github.com/fdecampredon/react-rxjs-todomvc).

The output I'm trying to achieve is to have the second subscriber also see "4" the moment it subscribes. I'm using RxJS in conjunction with ReactJS, so components unsubscribe when they're unmounted (due to a route change), and subscribe again when they're mounted again.

Alger answered 19/8, 2014 at 11:15 Comment(0)
D
3

Use replay(1) to change your stuff into a hot observable that will remember the last value and supply it to late-comers:

var stuff = ...;
stuff = stuff.replay(1);

// go ahead and start it listening
// you could use .refCount() here also
var subscription = stuff.connect();

// now subscribe your observers
stuff.subscribe(...);

// send some results
source.onNext(...);
source.onNext(...);

// subscribe another observer
// will see the lastest value
// as well as any new values
stuff.subscribe(...);

source.onNext(...);
// ...
Dasteel answered 15/9, 2014 at 14:31 Comment(3)
Currently, replay(1) does not work in RxJS. You will have to use replay(null, 1).Schizont
replay(null, 1) did nothing for me with RxJS 4.1.0. Using replay(x => x, 1) instead.Crocodilian
replay no longer exists in RXJSJabin
J
1

To update this question as replay has now been removed from the list of operators in rxjs. One strategy is to use shareReplay

...situations where you know you will have late subscribers to a stream that need access to previously emitted values.

So you want code something like

const $obs: Subject<boolean>;

const $newSharedObs = $obs.pipe(
   shareReplay({bufferSize: 1, refCount: false })
);

$obs.next(true);

//will log true even though subscribe is called after next
$newSharedObs.subscribe(s => {
   console.log(s);
});

Notice the refCount: false which will keep the observable "hot" even when their are no subscribers:

If refCount is true, the source will be unsubscribed from once the reference count drops to zero, i.e. the inner ReplaySubject will be unsubscribed. All new subscribers will receive value emissions from a new ReplaySubject which in turn will cause a new subscription to the source observable. If refCount is false on the other hand, the source will not be unsubscribed meaning that the inner ReplaySubject will still be subscribed to the source (and potentially run for ever).

Jabin answered 8/3, 2022 at 13:27 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.