Publish the last value of an observable
Asked Answered
C

3

22

I have a hot observable (a subject in this case):

var subject = new Rx.Subject();

I want to create another observable that every time a new subscriptions is being made immediately fires out the last value that was produced.

So in pseudo code:

var myObservableWithLastValue = subject.publishLast();

subject.onNext(3);

myObservableWithLastValue.subscribe(function(x){
    console.log(x); //should write 3
});

myObservableWithLastValue.subscribe(function(x){
    console.log(x); //should write 3, too
});

subject.onNext(4);

myObservableWithLastValue.subscribe(function(x){
    console.log(x); //should write 4
});

This is roughly what I want and it seems to work. However, I guess there must be some built in mechanism to achieve the same

Rx.Observable.prototype.keepLatest = function () {
    var latestValue;

    var disposable = this.subscribe(function (value) {
        latestValue = value;
    });

    return Rx.Observable.create(function (observer) {
        observer.onNext(latestValue);
        return disposable.dispose;
    });
};
Chefoo answered 16/3, 2012 at 10:40 Comment(0)
C
22

RxJs now has the ReplaySubject. Initialize it with 1 buffer and you have the BehaviorSubject.

// as an example, use buffer size of 2
var subject = new Rx.ReplaySubject(2 /* buffer size */);

subject.onNext('a');
subject.onNext('b');
subject.onNext('c');

subject.subscribe(function (x) { document.write('x1:' + x + '<br>'); });

subject.onNext('d');

subject.subscribe(function (x) { document.write('x2:' + x + '<br>'); });
<script src='https://rawgit.com/Reactive-Extensions/RxJS/v.2.5.3/dist/rx.all.js'></script>
Charlot answered 10/1, 2014 at 12:21 Comment(6)
@gajus-kuizinas Can you tell me what is the major difference between your code and the original? I'm afraid I'm missing something. Additionally, it sets the buffer size to 2 so the last 2 values will be replayed every time.Charlot
None. I just made the example interactive and complete. The buffer size is used to demonstrate difference from using BehaviorSubject in the @Asti answer. Thats the reason I did not create a new asnwer, but just contributed to yours.Pronghorn
@GajusKuizinas Ok. I see your point. Improved a little more to show the results in the results window instead of console :)Charlot
You really shouldn't be using document.write for this purpose. console.log allows to inspect and interact with the output. Thats opinion based suggestion, though.Pronghorn
@GajusKuizinas With console.log you can't see anything in the results window. Do you have any better suggestion? console.log is ok but not too interactiveCharlot
It is by design that you cannot see anything in the browser window... because you can see everything in your Console window (dev tools).Pronghorn
H
7

BehaviorSubject:

Initializes a new instance of the Rx.BehaviorSubject class which creates a subject that caches its last value and starts with the specified value.

var subject = new Rx.BehaviorSubject('a' /* initial value */);

subject.subscribe(function (x) {
    console.log('x1:' + x);
});

subject.onNext('d');

// Will produce the last value.

subject.subscribe(function (x) {
    console.log('x2:' + x);
});
<script src='https://rawgit.com/Reactive-Extensions/RxJS/v.2.5.3/dist/rx.all.js'></script>
Hussey answered 16/3, 2012 at 14:42 Comment(4)
Ok, but is there no built in operator that takes an existing stream and applies the sketched behaviour?Chefoo
@Chefoo Have you tried swapping your ordinary subject for a BehaviorSubject?Hussey
I'm not dealing with subjects directly. This was just to illustrate the behaviour better. I want an operator that applies on any hot infinite stream.Chefoo
Subjects implement both IObserver<T> and IObservable<T>. You should be able to get the behaviour subject to subscribe to your IObservable and then in turn Subscribe to the BehaviorSubjectSapp
P
0

I have used .cache(1) on a hot Observable. Seems to give the behaviour you asked for (but I am a novice).

observable that every time a new subscriptions is being made immediately fires out the last value that was produced.

Piassava answered 3/12, 2016 at 8:52 Comment(1)
FYI, my understanding is that cache has been removed from the upcoming release.Begat

© 2022 - 2024 — McMap. All rights reserved.