rxjs timeout to first value

2020-08-17 17:10发布

问题:

so as I understood from this question here I understood that timeout operator errors if an observable does not emit any value in the given window of time... the problem for me is, that this window of time resets after each emit, making it necessary to complete the sequence if you are only interested if the first value(s) emit within the window...

is there a nice way to have a "timeout to first"? Other than .take(1).timeout(1000)?

回答1:

In addition to @Maxime's answer, you can use race to create a race between your observable's first value and a timeout. We construct the timeout by combining never with timeout.

Thus we end up with a race between your source observable, and an observable that will never emit a value, but will throw an error after time goes by. If your source observable produces its first value, then race will stop listening to the timeout observable.

const timed = source.race(Observable.never().timeout(1000));


回答2:

There's no timeout for first only as far as I'm aware and here's a way of doing it:
(Plunkr available here)

const { Observable } = Rx;

// this function can be use with the let operator
// ex: someObs$.let(timeoutFirstOnly(100)).subscribe(...)
// it'll emit an error if the first value arrives later
// than timeout parameter value
const timeoutFirstOnly = timeout => observable$ => {
  const obs$ = observable$.share();

  const [firstValue$, others$] = obs$.partition((value, index) => index === 0);

  return Observable.merge(firstValue$.timeout(timeout).first(), others$);
};

const created = Rx.Observable.create(observer => {
  // here, this is working
  // try to emit at 150ms for example and it'll throw an error
  setTimeout(() => observer.next(42), 150);
});

created
  .let(timeoutFirstOnly(100))
  .subscribe(
    val => console.log(`received ${val}`),
    error => console.log(`threw ${error}`)
  );

But, I felt there might be another way so I asked on Gitter and @Dorus gave me a better result:

created.publish(src =>
  Rx.Observable.merge(
    src.take(1).timeout(100),
    src.skip(1)
  )
);

Explanation here, all credit goes to @Dorus on this one:

publish has 2 flavours:

  1. .publish() : ConnectableObservable<T>
  2. .publish(selector : (src : Observable<T>) => Observable<R>): Observable<R>

.publish() returns a ConnectableObservable and can be used either by calling refCount on it or by calling connect on it yourself. .publish(selector) let you use the multicasted source in the selector, and emit the result. By the way, .publish() is the same as .multicast(() => new Subject()) and .publish(selector) is the same as .multicast(() => new Subject(), selector).

Also .publish().refCount() is pretty much the same as .share(). You are correct to call share before partition. That solves the problem partially. I still prefer .publish(selector) because it's safer: .publish(selector) will run the selector and subscribe to all sources before it subscribes to the source observable. With share, the first subscription will activate the source, and anything emitted sync will be missed by the second subscription. This could be as worse as using an total sync source and then the second subscription would rerun the source completely.

One last addition: .publish(selector) has the same signature as .let(selector), but let does not multicast the source. That's the difference.