Promise .all() with RxJS

2020-01-31 05:29发布

问题:

I'm writing an app in Angular 2 and I want to execute several http requests and run a function on the responses.

In Angular 1, I would write something like $q.all([$http.get(...), $http.get(...), ...]).then(doSomethingWithResponses);

But Angular 2 returns RxJS Observables and after a bunch of reading I still can't figure out how to get the responses of several http requests. How can this can be done?

回答1:

As @Eric Martinez pointed out, there is forkJoin. forkJoin runs all observable sequences in parallel and collect their last elements.

Rx.Observable.forkJoin([a,b]).subscribe(t=> {
        var firstResult = t[0];
        var secondResult = t[1];
});


回答2:

I'm not sure you'd want to use forkJoin/zip, especially considering combineLatest is easier to understand and will emit on every sub-stream event, whereas forkJoin basically samples on every sub-stream having emitted.

This might come to bite you later when you want to combine multi-item Observables down the road.



回答3:

Wouldn't a merge work? You can subscribe and attach a handler to the onComplete callback.

I first build an array of my observables and then use static merge:

let obs_ary: any = [obs1, obs2, obs3];
Observable.merge(...obs_ary);


回答4:

I'm learning RxJS and I was trying to do the same thing with RxJS v5

It seems like we don't have forkJoin on v5 anymore, so here's how I got it working (works with flatMap or mergeMap, which are aliases):

const callOne = value =>
    new window.Promise(resolve =>
        setTimeout(() => resolve(value + 10), 3000)
    );

const callTwo = value =>
    new window.Promise(resolve =>
        setTimeout(() => resolve(value + 20), 1000)
    );

Rx.Observable
    .of(2)
    .do(() => console.log('querying...'))
    .mergeMap(number =>
        Rx.Observable.zip(
            Rx.Observable.fromPromise(callOne(number)),
            Rx.Observable.fromPromise(callTwo(number))
        )
    ).concatAll()
    .subscribe(createSubscriber('promises in parallel'));


标签: angular rxjs