how to unsubscribe a RXJS subscription inside the subscribe method?
Asked Answered
S

3

18

I have some javascript:

this.mySubscription = someObservable.subscribe((obs: any) => {
   this.mySubscription.unsubscribe();
   this.mySubscription = undefined;
}

on execution, the console logs the error ERROR TypeError: Cannot read property 'unsubscribe' of undefined. I wonder why I can not unsubscribe inside the subscribe lambda function. Is there a correct way to do so? I have read a bit about using dummy-subjects and completing them or using takeUntil/takeWhile and other pipe operators workArounds.

What is a correct way/workaround to unsubscribe a subscription inside the subscription's subscribe-function?

I am currently using a dummy subscription like so:

mySubscription: BehaviorSubject<any> = new BehaviorSubject<any>(undefined);


// when I do the subscription:
dummySubscription: BehaviorSubject<any> = new BehaviourSubject<any>(this.mySubscription.getValue());
this.mySubscription = someObservable.subscribe((obs: any) => {
    // any work...
    dummySubscription.next(obs);
    dummySubscription.complete();
    dummySubscription = undefined;
}, error => {
    dummySubscription.error(error);
});

dummySubscription.subscribe((obs: any) => {
    // here the actual work to do when mySubscription  emits a value, before it should have been unsubscribed upon
}, err => {
    // if errors need be
});
Scorekeeper answered 14/10, 2020 at 9:52 Comment(0)
B
37

You shouldn't try to unsubscribe in the subscribe function.
You can unsubscribe with operators like take, takeWhile or takeUntil.

take

Use take(n) to unsubscribe after someObservable emits n times.

someObservable.pipe(
  take(1)
).subscribe(value => console.log(value));

takeWhile

Use takeWhile to unsubscribe when an emitted value fails a condition.

someObservable.pipe(
  takeWhile(value => valueIsSave(value))
).subscribe(value => console.log(value));

valueIsSave(value): boolean {
  // return true if the subscription should continue
  // return false if you want to unsubscribe on that value
}

takeUntil

Use takeUntil(obs$) to unsubscribe when the observable obs$ emits.

const terminate = new Subject();

someObservable.pipe(
  takeUntil(terminate)
).subscribe(value => console.log(value));

unsub() { 
  terminate.next() // trigger unsubscribe
}
Berrie answered 14/10, 2020 at 10:44 Comment(4)
The condition of when to unsubscribe is complex and depends on the emitted value.Scorekeeper
@Scorekeeper Sounds like takeWhile should work for youBerrie
While testing out the different rxjs pipe operators, I got confused by accidentally subscribing multiple times. However, takeWhile did what I was looking for.Scorekeeper
takeWhile is what i was looking for, thank you fridoCuxhaven
G
2

If you make your stream asynchronous, what you're doing should work. For example, this will not work:

const sub = from([1,2,3,4,5,6,7,8,9,10]).subscribe(val => {
  console.log(val);
  if(val > 5) sub.unsubscribe();
});

but this will work:

const sub2 = from([1,2,3,4,5,6,7,8,9,10]).pipe(
  delay(0)
).subscribe(val => {
  console.log(val);
  if(val > 5) sub2.unsubscribe();
});

Because the JS event loop is fairly predictable (blocks of code are always run to completion), If any part of your stream is asynchronous, then you can be sure that your subscription will be defined before your lambda callback is invoked.

Should you do this?

Probably not. If your code relies on the internal (otherwise hidden) machinations of your language/compiler/interpreter/etc, you've created brittle code and/or code that is hard to maintain. The next developer looking at my code is going to be confused as to why there's a delay(0) - that looks like it shouldn't do anything.

Notice that in subscribe(), your lambda has access to its closure as well as the current stream variable. The takeWhile() operator has access to the same closure and the same stream variables.

from([1,2,3,4,5,6,7,8,9,10]).pipe(
  takeWhile(val => {
    // add custom logic
    return val <= 5;
  })
).subscribe(val => {
  console.log(val);
});

takeWhile() can to anything that sub = subscribe(... sub.unsubscibe() ... ), and has the added benefit of not requiring you to manage a subscription object and being easier to read/maintain.

Guck answered 14/10, 2020 at 15:54 Comment(0)
A
2

Inspired by another answer here and especially this article, https://medium.com/@benlesh/rxjs-dont-unsubscribe-6753ed4fda87, I'd like to suggest takeUntil() with following example:

...
    let stop$: Subject<any> = new Subject<any>(); // This is the one which will stop the observable ( unsubscribe a like mechanism )

    obs$
      .pipe(
        takeUntil(stop$)
      )
      .subscribe(res => {
        if ( res.something === true ) {
          // This next to lines will cause the subscribe to stop
          stop$.next();
          stop$.complete();
        }

      });
...

And I'd like to quote sentence RxJS: Don’t Unsubscribe from those article title mentioned above :).

Aec answered 6/3, 2022 at 3:33 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.