Disposeable for Observer
Asked Answered
S

2

6

In RxJava 1 subscribing with an Observer returned a Subscription which could be unsubscribed.

In RxJava 2 subscribing with an Observer returns void and no Disposeable. How is it possible to stop that "Subscription"?

// v1
rx.Observable<Long> v1hot = rx.Observable.interval(1, TimeUnit.SECONDS);
rx.Observer<Long> v1observer = new TestSubscriber<>();
Subscription subscription = v1hot.subscribe(v1observer);
subscription.unsubscribe();

// v2
Observable<Long> v2hot = Observable.interval(1, TimeUnit.SECONDS);
Observer<Long> v2Observer = new TestObserver<>();
v2hot.subscribe(v2Observer); // void

EDIT: how to handle the case, where we use an observer which doesn't itself implement Disposable, like BehaviorSubject? Like in this example:

// v1
rx.Observable<Long> v1hot = rx.Observable.interval(1, TimeUnit.SECONDS);
rx.Observer<Long> v1observer = rx.subjects.BehaviorSubject.create();
Subscription subscription = v1hot.subscribe(v1observer);
subscription.unsubscribe();

// v2
Observable<Long> v2hot = Observable.interval(1, TimeUnit.SECONDS);
Observer<Long> v2Observer = BehaviorSubject.createDefault(-1L);
v2hot.subscribe(v2Observer); // void
Sessile answered 2/1, 2017 at 15:5 Comment(0)
E
10

All other subscribe methods return a Disposable. In your example, the TestObserver itself implements Disposable, so you can call dispose() on the observer itself to dispose of the subscription.

Otherwise you can use DisposableObserver as a base class for your own custom observers to have the Disposable behavior provided to you by the abstract base class.

EDIT to answer the updated question:

In case you need to use the subscribe(Observer) method (the one returning void), but you need to use an Observer which doesn't implement Disposable, you still have the option to wrap your Observer in a SafeObserver which will provide you with Disposable behavior (among other contract conformance guarantees).

Enchondroma answered 2/1, 2017 at 15:27 Comment(4)
In this specific case that works. But what if it doesn't implement that? Like if its a subject (updated question).Sessile
Thanks! I wonder why they changed that. That's very inconvenient.Sessile
Unfortunately, that's something I can't answer. Maybe the awesome lead developer of RxJava2, Dávid Karnok could be able to elaborate on design decisions, but there's no mechanism in SO to notify him so we can only hope he will check on this question, which he does regularily on RxJava related questions.Hazlitt
Actually, I found this link on RxJava's GitHub page, which provides some reasoning for the changes: it's mostly for compatibility with the Reactive-Streams spec.Hazlitt
R
0

Instead of calling subscribe(Observer) you can call subscribeWith(DisposableObserver) which returns a disposable.

Rowney answered 17/7, 2018 at 9:16 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.