RxJS Continue Listening After Ajax Error
Asked Answered
A

3

20

RxJs stops listening to click events when an inner observable errors (Ajax request). I'm trying to figure out how to keep the event listener hooked to the button click event and gracefully handle the inner ajax error.

Here is my example code and a link to plunkr

var input = $("#testBtn");
var test = Rx.Observable.fromEvent(input,'click');

var testAjax = function() {
  return Rx.Observable.range(0,3).then(function(x){ 
    if(x==2)throw "RAWR"; //Simulating a promise error.
    return x;
  });
}

test.map(function(){
  return Rx.Observable.when(testAjax());
})
.switchLatest()
.subscribe(
  function (x) {
      console.log('Next: ', x);
  },
  function (err) {
      console.log('Error: ' + err);   
  },
  function () {
      console.log('Completed');   
  });

http://plnkr.co/edit/NGMB7RkBbpN1ji4mfzih

Agrapha answered 21/7, 2014 at 1:45 Comment(2)
For what purpose are you using when and then? It doesn't look like it's buying you anything...Speculum
This was done just as an example of faking an ajax request. A mockjax example is provided here. plnkr.co/edit/UtReTejs524rX2DKbIoj?p=previewAgrapha
S
22

You can use the catch operator (or the catchException alias) to catch and handle errors which occur in an observable, so that subscribers are not notified of the error.

Ignore Errors:

return Rx.Observable
    .when(testAjax())
    .catch(Rx.Observable.empty()); // continues with an empty obs after error.

Handle Errors:

var empty = Rx.Observable.return('default value');
return Rx.Observable
    .when(testAjax())
    .catch(function (error) {
        console.log('error:', error);
        return empty;
    });
Speculum answered 21/7, 2014 at 17:4 Comment(7)
THANK YOU!. I was like 90% there. I figured out that running catch() was what I needed to run on the inner observable but couldn't figure out what the return value should be. Ultimately Rx.Observable.Empty() was what I was missing. Here is an updated plunker plnkr.co/edit/UtReTejs524rX2DKbIoj?p=previewAgrapha
+1 for eventually reaching the answer on your own. :) Also, if in doubt, just create an observable: Observable.create. Just make sure you keep it hygienic.Speculum
If I put the catch() on the outer Observable, after the flatMapLatest it will complete the subscription. This only appears to work if I handle all exceptions on the inner observables. Any suggestions?Agrapha
Right on. flatMapLatest has no mechanism for handling errors from the observables it subscribes to, so you have a couple of options. 1) don't let those errors "bubble" to the flatMapLatest, or 2) create a "type" that can house either a value OR an error, and let flatMapLatest "bubble" both of those through via onNext. On the subscriber side, you can "pattern match" for that type, and decide what to do with the error/value.Speculum
At first I did not understand that this catch operator should be in the "inner" Observable, returned from the loading function to switchMap of main Observable, which we want to not be stopped on errors. I tried this catch on main Observable and it obviously did not work. :) Maybe you should include the "full" example in the answer.Parmenter
What if I want subscribers to be notified of the error and the stream to continue normally?Belldas
@Belldas that's application-level functionality. one way of achieving this would be to use an observable that yields type Response = { value: SomeValueType } | { error: Error } instead of just value. This upgrades the error to something your application can consume normally, just as you might handle an xml request error.Speculum
P
6

I ran into my own issues interpreting how this (and onErrorResumeNext) functioned. The struggle I encountered was with what context the catch (or resume next) applied to. The simple colloquial translation that makes sense for me is the following:

Given a stream (or observable) of observables, catch (or onErrorResumeNext) will consume the error and allow you to provide one or more observables to continue the original stream with.

The key take away is that your original source is interrupted and replaced with the observable(s) you provide in the catch/onErrorResumeNext function. This means that if you had something like this:

var src = Rx.Observable
    .interval(500)
    .take(10)
    .select(function(x) {
        if (x == 5) {
            return Rx.Observable.throw('Simulated Failure');
        }

        return Rx.Observable.return(x * 2);
    })

Then adding .catch(Rx.Observable.return('N/A')) or .onErrorResumeNext(Rx.Observable.return('N/A')) will not actually just continue your stream (sourced by the interval), but rather end the stream with a final observable (the N/A).

If you are looking to instead handle the failure gracefully and continue the original stream you need to so something more like .select(function(x) { return x.catch(Rx.Observable.return('N/A')); }). Now your stream will replace any observable element in the stream that fails with a caught default and then continue on with the existing source stream.

var src = Rx.Observable
    .interval(500)
    .take(10)
    .select(function(x) {
        if (x == 5) {
            return Rx.Observable.throw('Simulated Failure');
        }

        return Rx.Observable.return(x * 2);
    })
    //.catch(Rx.Observable.return('N/A'))
    //.onErrorResumeNext(Rx.Observable.return('N/A'))
    .select(function(x) { return x.catch(Rx.Observable.return('N/A')); })
    .selectMany(function(x) { return x; });


var sub = src.subscribe(
    function (x) { console.log(x); },
    function (x) { console.log(x); },
    function () { console.log('DONE'); }
);

// OUTPUT:
// 0
// 2
// 4
// 6
// 8
// N/A
// 12
// 14
// 16
// 18
// DONE

Here is a JSFiddle that shows this in action.

Plateau answered 23/9, 2015 at 22:35 Comment(2)
What does the selectMany(function(x) { return x; }) do?Senatorial
this strategy produces an observable event stream of observables. The selectMany (or flatMap) expands the stream so that it becomes just an observable stream of results (strings in this case) by unwrapping the inner observable.Plateau
D
4

I was still a little confused after trying the accepted answer, so this is what ended up working for me. This is what I had:

  Rx.Observable.fromEvent(emitter, events.SUBMIT_EVENT)
      .flatMapFirst(email=>{
        return $.ajax({
          method: "POST",
          url: '/email',
          data: {
            email: email
          },
        }).promise()
      })
      .subscribe(response=>{
        if (response) {
          //do your success thing here
            }
          },
         error =>{
           //do your error thing
          }
         )

When the server returned an error (like when the user already entered their email) I wasn't able to listen for the user's email form submit again. This is what worked for me:

  Rx.Observable.fromEvent(emitter, events.SUBMIT_EVENT)
      .flatMapFirst(email=>{
        return $.ajax({
          method: "POST",
          url: '/email',
          data: {
            email: email
          },
        }).promise()
      })
      .doOnError(error=> {
        //do your error thing here
      })
      .retry()
      .subscribe(response=>{
        if (response) {
          //do your success thing here
        }
      })
Dapsang answered 9/11, 2015 at 0:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.