How to explicitly unSubscribe from observable after the onComplete
Asked Answered
D

4

5

In the following code, how and where to put unSubscribe to make un-subscription of the Observable explicitly exactly after finishing the onComplete?.

getObservable()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe(new Subscriber<Boolean>() {
        @Override
        public void onCompleted() {
            doSomething();
        }

        @Override
        public void onError(Throwable e) {
            thereIsError();
        }

        @Override
        public void onNext(Boolean status) {
            updateView();
        }
    });
Delphinedelphinia answered 24/12, 2017 at 7:44 Comment(0)
C
5

You can save Disposable returned by subscribe method and use it inside onCompleted callback

Disposable d = 
getObservable()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe(new Subscriber<Boolean>() {
        @Override
        public void onCompleted() {
            doSomething();
            d.dispose();
        }

        @Override
        public void onError(Throwable e) {
            thereIsError();
        }

        @Override
        public void onNext(Boolean status) {
            updateView();
        }
    });
Charolettecharon answered 25/12, 2017 at 13:12 Comment(1)
What if there's an exception and onError gets called? WIll that lead to memory leaks because the stream is not disposed?Cronk
B
5
 disposables.add(sampleObservable()
            // Run on a background thread
            .subscribeOn(Schedulers.io())
            // Be notified on the main thread
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DisposableObserver<String>() {
                @Override
                public void onComplete() {
                    // Do some work for completion
                }

                @Override
                public void onError(Throwable e) {
                     // Do some work for error
                }

                @Override
                public void onNext(String value) {
                    // Do some work for next
                }
            }));

And clear and unsubscribe

@Override
protected void onDestroy() {
    super.onDestroy();
    disposables.clear(); // do not send event after activity has been destroyed
    disposables.dispose();
}
Bun answered 26/12, 2017 at 4:49 Comment(0)
E
4

The short answer is that you have no reason to explicitly unsubscribe as that is done by the observer chain. You can test this yourself by adding this operator:

.doOnUnsubscribe( () -> System.out.println("Unsubscribed") )
Eddins answered 24/12, 2017 at 14:5 Comment(1)
Any termination event (error or completion) unsubscribes (unless you are using something like .retry() )Fructuous
M
1

just use take(1) if you want it to unSubscribe right away. after you call take(1) it should make onComplete get called.

Disposable d = 
getObservable()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .take(1)//take the first, then stop emissions
    .subscribe(new Subscriber<Boolean>() {
        @Override
        public void onCompleted() {
            doSomething();
            d.dispose();
        }

        @Override
        public void onError(Throwable e) {
            thereIsError();
        }

        @Override
        public void onNext(Boolean status) {
            updateView();
        }
    });
Mcclurg answered 13/4, 2020 at 11:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.