RxJS Continue Listening After Ajax Error

2019-01-22 07:56发布

问题:

RxJs stops listening to click events when an inner observable errors (Ajax request). I'm trying to figure out how to keep the event listener hooked to the button click event and gracefully handle the inner ajax error.

Here is my example code and a link to plunkr

var input = $("#testBtn");
var test = Rx.Observable.fromEvent(input,'click');

var testAjax = function() {
  return Rx.Observable.range(0,3).then(function(x){ 
    if(x==2)throw "RAWR"; //Simulating a promise error.
    return x;
  });
}

test.map(function(){
  return Rx.Observable.when(testAjax());
})
.switchLatest()
.subscribe(
  function (x) {
      console.log('Next: ', x);
  },
  function (err) {
      console.log('Error: ' + err);   
  },
  function () {
      console.log('Completed');   
  });

http://plnkr.co/edit/NGMB7RkBbpN1ji4mfzih

回答1:

You can use the catch operator (or the catchException alias) to catch and handle errors which occur in an observable, so that subscribers are not notified of the error.

Ignore Errors:

return Rx.Observable
    .when(testAjax())
    .catch(Rx.Observable.empty()); // continues with an empty obs after error.

Handle Errors:

var empty = Rx.Observable.return('default value');
return Rx.Observable
    .when(testAjax())
    .catch(function (error) {
        console.log('error:', error);
        return empty;
    });


回答2:

I ran into my own issues interpreting how this (and onErrorResumeNext) functioned. The struggle I encountered was with what context the catch (or resume next) applied to. The simple colloquial translation that makes sense for me is the following:

Given a stream (or observable) of observables, catch (or onErrorResumeNext) will consume the error and allow you to provide one or more observables to continue the original stream with.

The key take away is that your original source is interrupted and replaced with the observable(s) you provide in the catch/onErrorResumeNext function. This means that if you had something like this:

var src = Rx.Observable
    .interval(500)
    .take(10)
    .select(function(x) {
        if (x == 5) {
            return Rx.Observable.throw('Simulated Failure');
        }

        return Rx.Observable.return(x * 2);
    })

Then adding .catch(Rx.Observable.return('N/A')) or .onErrorResumeNext(Rx.Observable.return('N/A')) will not actually just continue your stream (sourced by the interval), but rather end the stream with a final observable (the N/A).

If you are looking to instead handle the failure gracefully and continue the original stream you need to so something more like .select(function(x) { return x.catch(Rx.Observable.return('N/A')); }). Now your stream will replace any observable element in the stream that fails with a caught default and then continue on with the existing source stream.

var src = Rx.Observable
    .interval(500)
    .take(10)
    .select(function(x) {
        if (x == 5) {
            return Rx.Observable.throw('Simulated Failure');
        }

        return Rx.Observable.return(x * 2);
    })
    //.catch(Rx.Observable.return('N/A'))
    //.onErrorResumeNext(Rx.Observable.return('N/A'))
    .select(function(x) { return x.catch(Rx.Observable.return('N/A')); })
    .selectMany(function(x) { return x; });


var sub = src.subscribe(
    function (x) { console.log(x); },
    function (x) { console.log(x); },
    function () { console.log('DONE'); }
);

// OUTPUT:
// 0
// 2
// 4
// 6
// 8
// N/A
// 12
// 14
// 16
// 18
// DONE

Here is a JSFiddle that shows this in action.



回答3:

I was still a little confused after trying the accepted answer, so this is what ended up working for me. This is what I had:

  Rx.Observable.fromEvent(emitter, events.SUBMIT_EVENT)
      .flatMapFirst(email=>{
        return $.ajax({
          method: "POST",
          url: '/email',
          data: {
            email: email
          },
        }).promise()
      })
      .subscribe(response=>{
        if (response) {
          //do your success thing here
            }
          },
         error =>{
           //do your error thing
          }
         )

When the server returned an error (like when the user already entered their email) I wasn't able to listen for the user's email form submit again. This is what worked for me:

  Rx.Observable.fromEvent(emitter, events.SUBMIT_EVENT)
      .flatMapFirst(email=>{
        return $.ajax({
          method: "POST",
          url: '/email',
          data: {
            email: email
          },
        }).promise()
      })
      .doOnError(error=> {
        //do your error thing here
      })
      .retry()
      .subscribe(response=>{
        if (response) {
          //do your success thing here
        }
      })