RxJS Observable: performing cleanup when the last subscription is disposed?
Asked Answered
T

1

12

What is the cleanest way to perform a side-effect when the last subscription for a RxJS Observable is disposed? This may occur before the Observable has terminated.

Let's say that I need a function returning an Observable that emits changes to a resource. I'd like to perform a cleanup action when all subscriptions have been disposed.

var observable = streamResourceChanges(resource);
var subscription1 = observable.subscribe(observer1);
var subscription2 = observable.subscribe(observer2);
// ...
subscription1.dispose();  // Does not perform the cleanup
subscription2.dispose();  // Performs the cleanup

The only way I've found to define a subscription disposal action is to use Rx.Observable.create. The last disposal can be handled by sharing a subscription, for example with Observable.prototype.singleInstance().

For example:

function streamResourceChanges(resource) {
    return Rx.Observable.create(function(observer) {
        // Subscribe the observer for resource changes...
        // Return a cleanup function
        return function() {
            // Perform cleanup here...
            console.log("Cleanup performed!");
        };
    }).singleInstance();
}

Is there a neater way to define a side-effect for subscription disposal, similar to doOnNext, doOnCompleted or doOnError?

var withCleanup = withoutCleanup.doOnDispose(function() {
    // Perform cleanup here...
});
Thanos answered 26/10, 2015 at 19:19 Comment(0)
C
14

Two choices come to mind depending upon your actual use case:

.finally()

source.finally(() => console.log("cleaning up")).singleInstance()

.using()

Rx.Observable
    .using(
        // allocate some disposable resource during subscribe.
        // resource.dispose() will be called during unsubscribe.
        () => new SomeResource(),

        // use the disposable resource to create your observable
        // for example...
        resource => Rx.Observable.interval(resource.time))
    .singleInstance();
Cervantez answered 26/10, 2015 at 20:41 Comment(4)
Thanks! The using method sounds like a valid alternative for create! The finally callback runs only when the Observable terminates, which may not necessarily be the case when the last subscription is disposed. Sorry I wasn't very specific about this; I edited my question.Thanos
The documentation does not state it very well. Finally actually calls the action when the subscription is disposed, which includes during termination or when you manually dispose the subscription.Cervantez
I tried finally and you are totally correct! The documentation currently says nothing about this behavior (and it also has incorrect description for the callback parameter). So, this is exactly what I was looking for! Thank you!Thanos
.finally() has been renamed to the pipeable finalize()Limey

© 2022 - 2024 — McMap. All rights reserved.