Why Observable.race not working if one of observab

2019-08-09 03:41发布

问题:

I'd like to implement websocket reconnect in webapp if internet connection is lost. In order to detect that internet is lost I use ping-pong approach, which means that I send from client ping-message and server returns me pong-message.

When webapp loaded I send init ping message and start to listen a reply on socket some kind of this:

this.websocket.onmessage = (evt) => {
      try {

        const websocketPayload: any = JSON.parse(evt.data);

        if (websocketPayload.pong !== undefined && websocketPayload.pong == 1) {

          this.pingPong$.next('pong');
        }

It means that internet connection looks ok and we can continue. Also I have the follow code:

 Observable.race(
      Observable.of('timeout').delay(5000).repeat(),
      this.pingPong$
    ).subscribe((data) => {
      console.log("[ping-pong]:", data);
      if (data == 'pong') {
        Observable.interval(5000).take(1).subscribe(() => {
          console.log("[ping-pong]:sending ping")
          this.send({ping:1})
        });
      } else if (data == 'timeout'){
        // show reconnect screen and start reconnect
        console.error("It looks like websocket connection lost");
      }
    });

But! When this.pingPong$ subject stops to emit events - .next() doesn't happen because of we can't get response when I break connection manually - I considered that in Observable.race this observable will be emitted

Observable.of('timeout').delay(5000).repeat()

But my subscribe never happens if this.pingPong$ stop emitting.

Why ?

Thank you

回答1:

race picks and keeps subscribed to the first Observable that emits.

So if your this.pingPong$ starts emitting and then stops it makes no difference because race keeps subscribed to this.pingPong$. The other Observables don't matter any more. You might want emit one value from this.pingPong$ and the repeat the whole process. For example like the following:

Observable.race(
    Observable.of('timeout').delay(5000).repeat(),
    this.pingPong$
  )
  .pipe(
    take(1), // complete the chain immediately
    repeat() // resubscribe after take(1) completes the chain
  )
  .subscribe(...);

Obviously it mostly depends on what you want to do but I hope you get the point.