Angular 2 - How to change the interval of an RxJS

2020-06-21 06:01发布

问题:

I'm using rxJS Observable Interval to refresh the data being fetched. I can't figure out the way to change the interval setting. I've seen something about using the Subject class provided by rxJS but I can't manage to get it to work.

I provided an simplified example in this plunk

In the AppComponent I have this method.

getTime() {
        this.timeService.getTime(this.refreshInterval)
          .subscribe(t => {
            this.currentTime = t;
            console.log('Refresh interval is: ' + this.refreshInterval);
          }
        );
}

And in the service component I currently have this code.

getTime(refreshInterval: number) {
  return Observable.interval(refreshInterval)
        .startWith(0)
        .map((res: any) => this.getDate())
        .catch(this.handleError)
}

Can someone perhaps provide me with a working example it would be great!

回答1:

As I understand from your plnkr, your goal is to allow user to modify timer intervals.

You expect, that change of refreshInterval will change declared stream of rxJs:

    this.timeService.getTime(this.refreshInterval)
      .subscribe(t => {
        this.currentTime = t;
        console.log('Refresh interval is: ' + this.refreshInterval);
      }
    );

and this is wrong.

every time, you update refreshInterval, you need to:

  • unsubscribe or destroy previous stream.
  • create new stream and subscribe again


回答2:

You do not need to destroy and recreate the entire Observable stream to change the refreshInterval. You only need to renew the part of the stream that depends on the changing interval.

First simplify your service's getTime() so it's not in charge of determining the frequency of outputs. All it does is return the time:

getTime() { return (new Date()).toString(); }

Now the calling code will determine the schedule. Just 3 simple steps:

1. A source function that adjusts to the desired interval:

/** Observable waits for the current interval, then emits once */
refreshObs() {return Observable.timer(this.refreshInterval)}

2. An observable chain that uses the repeat operator to continuously re-execute the stream:

getTime$ = Observable.of(null)
            .switchMap(e=>this.refreshObs()) // wait for interval, then emit
            .map(() => this.timeService.getTime()) // get new time
            .repeat(); // start over

3. A subscription to trigger the whole thing:

ngOnInit(){
    this.getTime$.subscribe(t => {
        this.currentTime = t;
        console.log('refresh interval = '+this.refreshInterval);
    });
}

This works because refreshObs() returns a new observable each time the stream is repeated, and that new observable will wait according to the currently set interval before emitting.

Live demo



回答3:

I wanted to build on the previous answers here (and elsewhere on Stack Overflow). My example has a common RefreshService that various components can use for subscriptions. This way, a site can have a single "Refresh every X seconds" component that each component can subscribe to.

https://plnkr.co/edit/960yztjl3dqXQD2XPSei?p=preview

The service provides the function withRefresh that provides an Observable. It takes advantage of BehaviorSubject, which will immediately trigger an event on the subscription.

export class RefreshService {

  static interval$: BehaviorSubject<number> = new BehaviorSubject<number>(30000);

  setInterval(newInterval: number){
    RefreshService.interval$.next(newInterval);
  }

  public withRefresh() {
    return RefreshService.interval$
      .switchMap((int: number) => Observable
        .interval(int)
        .startWith(0)
      );
  }
}

Then any component can use this service as follows:

this.refreshService
  .withRefresh()
  .switchMap(() => /* do something on each interval of the timer */);