RxJs Observable interval until reached desired val

2019-09-13 01:52发布

问题:

I want to poll for changes and when a desired value is reached the Observable should complete (or wait until timeout). Right now I use the filter which works fine to wait until the desired value is reached. But I want the Observable to push events while waiting for this value.

For example, I wait for the status 'success' and until the status changes to 'success' the status 'testing' is returned from my service. But since the filter is waiting for 'success', 'testing' never returns.

My code right now:

return Observable
  .interval(this.POLL_TIMEOUT)
  .flatMap(() => this.getSingleProjectStatus(projectId, repoName))
  .filter(data => this.finishedStatus(data.status))
  .take(1)
  .timeout(this.MAX_TIMEOUT, Observable.throw(new Error('Timeout')));

回答1:

You probably want takeWhile instead of filter.

return Observable
  .interval(this.POLL_TIMEOUT)
  .flatMap(() => this.getSingleProjectStatus(projectId, repoName))
  .takeWhile(data => this.finishedStatus(data.status))
  .timeout(this.MAX_TIMEOUT, Observable.throw(new Error('Timeout'));

Note the above takes all except the last event, if you want the last event too you'll need to be a little trickier.

  const source = Observable.interval(this.POLL_TIMEOUT)
    .flatMap(() => this.getSingleProjectStatus(projectId, repoName))
    .share();

  source
    .takeUntil(source.filter(data => this.finishedStatus(data.status)))
    .timeout(this.MAX_TIMEOUT, Observable.throw(new Error('Timeout'));

In this case you are taking all the results until another Observable emits, the other Observable in this case is simply the source filtered such that it only emits success events.

JsBin: http://jsbin.com/sojosuhune/edit?html,js,console,output