Observable stops firing even when catching the error
Asked Answered
Z

2

7

I'm facing with a very strange behavior on my project, I have a simple Angular service with the below code:

seatClick$ = new Subject<Seat>();

and a method on the service that fires the observable:

  handleSeatClick(seat: Seat) {
    this.seatClick$.next(seat);
  }

the observable logic is simple:

this.seatClick$.pipe(
    exhaustMap((seat: Seat) => {
       this.someFunctionThatThrowsException(); // this function throws ref exception
      return of(null);
    })
    , catchError(err => {
        console.log('error handled');
        return of(null);
    })
)
.subscribe(() => {
    console.log('ok');
  },
  (error1 => {
    console.log('oops');
  })
);

This is really strange, when "someFunctionThatThrowsException" is called it throws some ReferenceError exception, this exception is then catched with the catchError and the next() event is fired.

However, from this moment on the seatClick observable stops responding, as if it was completed, calling handleSeatClick on the service won't respond any more.

What am I missing here?

Zamia answered 18/5, 2020 at 10:20 Comment(1)
Could you please show the implementation of this.someFunctionThatThrowsException()?Billman
C
6

That's correct behavior, you need repeat operator here to resubscribe.

this.seatClick$.pipe(
    exhaustMap((seat: Seat) => {
       this.someFunctionThatThrowsException();
       return of(null);
    })

    // in case of an error the stream has been completed.
    , catchError(err => {
        console.log('error handled');
        return of(null);
    })

    // now we need to resubscribe again
    , repeat() // <- here
)
.subscribe(() => {
    console.log('ok');
  },
  (error1 => {
    console.log('oops');
  })
);

also if you know that something might fail you can dedicate it to an internal stream and use catchError there, then you don't need repeat.

this.seatClick$.pipe(
  // or exhaustMap, or mergeMap or any other stream changer.
  switchMap(seal => of(seal).pipe(
    exhaustMap((seat: Seat) => {
       this.someFunctionThatThrowsException();
       return of(null);
    })
    , catchError(err => {
        console.log('error handled');
        return of(null);
    })
  )),
  // now switchMap always succeeds with null despite an error happened inside
  // therefore we don't need `repeat` outside and our main stream
  // will be completed only when this.seatClick$ completes
  // or throws an error.
)
.subscribe(() => {
    console.log('ok');
  },
  (error1 => {
    console.log('oops');
  })
);
Complaisance answered 18/5, 2020 at 11:8 Comment(16)
Thanks, why is it the correct behavior? if no exception is thrown it works, shouldn't the catchError jandle the error and keep the observable alive?Zamia
in rx an error completes the stream. when you catch an error in catchError the parent stream has been already closed and as an option you can switch current stream to another one, because of completes right after null the whole stream has been completed and subscribe won't get any thing from it, you can add 3rd callback to see when the stream has been completed (right after null). repeat listens on completion and resubscribes so the subscribe, in case of an error it's like a brand new subscription.Complaisance
imagine that you need to continue in error1 => console.log('oops') - it's impossible because the stream has been closed and you need a workaround to resubscribe.Complaisance
@satanTime: I understand there isn't any difference b/n retry() and repeat() here, but wouldn't retry() be more semantically correct since the resubscription is after an error?Billman
depends on your requirements, if you expect a stream to fail and give it N attempts to succeed retry is the right solution. If you want stream always to be active then you need repeat, also retry works only in case of an error, on succeeded complete it won't resubscribe as I understand.Complaisance
@sataTime thanks, it makes sense, but something is still unclear, so basically if i have nested observable called by seatClick$ and i want to make sure seatClick$ is always kept alive i can avoid catching errors of all nested observable and just make sure to catch the error on seatClick$ and call repeat? and one last question, since obviously your way familiar with observable, is this type of design works? i mean setting up an subject observable, subscribing and firing next from a method? since with complex and nested observable the code can be quite complex, thank!!Zamia
about the first question yes - also I'll update the answer to show you how you can skip repeat and catch errors, but it requires switchMap. about the second one, it is normal, you can even skip the method and use it in the template (click)="seatClick$.next(seat)"Complaisance
Just a note - if both of them in the same component then you don't need rxjs at all, simply call this.someFunctionThatThrowsException() in the click handler.Complaisance
thanks again, about using switchMap, its problematic since i used exhaustMap to ensure the source observable seatClick$, wont handle any requests while the inner observable is processing , i think switchMap will "remember" and emit the values when exhaustMap completes.Zamia
you can use exhaustMap instead of switchMap for this pattern too, the main goal is to have an error outside of the main stream, then you can simply catch it without repeat.Complaisance
ok, so exhaustMap inside exhaustMap just to handle the stream.Zamia
Hi @TomerMiz, is there any subject we need to solve to make the answer acceptable?Complaisance
No you have been great, my only concern is that i am not able to find, and i have tried, a single tutorial or an article or an example that uses the same concept i have used in my question, meaning creating subscribing to a subject in my service, adding pipes to the subject to handle some business logic in the service constructor and invoking it from a function using .next, in my question below i wanted to avoid calling the same function twice untill its completed, so i created a subject with exahustmap but again seems weird no one else works like that??Zamia
you can try share() or shareReplay(1), it protects parent stream and triggers it only one. I would say - find free time, go to rxjs doc, try every function they have, understand how they work, repeat once a quarter or half a year. rxjs is cool but in the same time isn't easy and requires expertise and experience.Complaisance
Thanks, its not that i dont understand, its just that i couldnt dind any tutorial or example online that uses the approach i am using with subject in services. Do you if this design pattern has a name i should search for ?Zamia
yeah, that's not what I meant :) rxjs is tough, that's it. you can check different implementations here: learnrxjs.io/learn-rxjs/recipes, not all of them are good, but still it worth checking.Complaisance
F
0

A good alternative to using the repeat() operator is nesting the error handling in the inner pipeline. There it is totally fine to - this observable is supposed to terminate anyway.

this.seatClick$.pipe(
  exhaustMap((seat: Seat) => {
    // We moved the error handling down one level.
    this.someFunctionThatThrowsException().pipe(
      catchError(err => {
        console.log('error handled');
        return of(null);
      }),
    );
    return of(null);
  }),
).subscribe());
Fugate answered 15/12, 2021 at 14:23 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.