How can I use RxJs to hold off any requests for an

2019-02-17 03:54发布

问题:

I have an observable that represents an action that is triggered by some outside component. For the purpose of this question, let's call it createBananaAction. I have a bananaService with a method create that performs an AJAX request and returns the created banana as a Promise.

So, whenever some data arrives from the createBananaAction, we want to call bananaService.create().

The code looks like this: (using RxJs)

this.createdBananas = createBananaAction.flatMap(() => bananaService.create());

Now, the challenge is to "throttle" the createBananaAction so that it can only request another banana after it received its previous one. Simply put: there can never be two simultaneous calls to bananaService.create(). Note that I do not want to throttle in time, but rather ignore all incoming requests for a new banana while the bananaService is doing its thing.

I did some research and found the seemingly appropriate pausable operator.

My code now looks like this:

const pausableCreateBananaAction = createBananaAction.pausable();

this.createdBananas = pausableCreateBananaAction
    .do(() => pausableCreateBananaAction.pause())
    .flatMap(() => bananaService.create())
    .do(() => pausableCreateBananaAction.resume());

This seems to work, but I don't like the fact that I need these do statements to manually trigger the pause and resume statements.

I found out you can pass an observable into pausable which should then produce false or true at appropriate times, but this also requires me to manually push values in a subject. Something like this:

const letItGoThrough = new Rx.Subject();

this.createdBananas = createBananaAction
    .pausable(letItGoThrough.startWith(true))
    .do(() => letItGoThrough.onNext(false))
    .flatMap(() => bananaService.create())
    .do(() => letItGoThrough.onNext(true));

So now I have an Rx.Subject (subjects are like training wheels for RxJs, it's what you use until you become experienced enough at RxJs that you no longer need them.) and two calls to do.

Is there a fully "Reactive" way of doing what I want? Or am I stuck with this construct that looks and feels a bit too imperative for my tastes.

Thank you in advance.

回答1:

Just use flatMapFirst instead of flatMap:

this.createdBananas = createBananaAction.flatMapFirst(() => bananaService.create());

The above assumes your bananaService.create() returns a cold observable. If it returns a hot observable or a Promise, then we need to wrap the call with Observable.defer to convert it to a cold observable so that flatMapFirst can control its firing properly:

this.createdBananas = createBananaAction
    .flatMapFirst(() => Rx.Observable.defer(() => bananaService.create()));