Subscribing to a nested Observable

2019-01-09 17:55发布

问题:

I have an app that makes one http request to get a list of items and then makes an http request for each item in the list to get more detailed information about each item. Effectively:

class ItemsService {
  fetchItems() {
    return this.http.get(url)
    .map(res => res.json())
    .map(items => items.map(this.fetchItem(item)));
  }

  fetchItem(item: Item) {
    this.http.get(`${url}/${item.id}`)
    .map(res => res.json());
  }
}

Then I'll do something like itemsService.fetchItems().subscribe(items => console.log(items)) but what ends up happening is I get an array of observables (each response from fetchItem). I need to subscribe to each of the internal observables as well so that the fetchItem request actually gets triggered.

I've also tried using flatMap instead of map but it seems to have the same result in this case. Is there any way for the nested observable to be subscribed to?

回答1:

I'd do it like the following:

function mockRequest() {
    return Observable.of('[{"id": 1}, {"id": 2}, {"id": 3}]');
}
function otherMockRequest(id) {
    return Observable.of(`{"id":${id}, "desc": "description ${id}"}`);
}

class ItemsService {
    fetchItems() {
        return mockRequest()
            .map(res => JSON.parse(res))
            .concatAll()
            .mergeMap(item => this.fetchItem(item));
    }

    fetchItem(item: Item) {
        return otherMockRequest(item.id)
            .map(res => JSON.parse(res));
    }
}

let service = new ItemsService();
service.fetchItems().subscribe(val => console.log(val));

See live demo: http://plnkr.co/edit/LPXfqxVsI6Ja2J7RpDYl?p=preview

I'm using a trick with .concatAll() to convert an array of Objects such as [{"id": 1}, {"id": 2}, {"id": 3}] into separate values emitted one by one {"id": 1}, {"id": 2} and {"id": 3} (as of now it's an undocumented feature). Then I use mergeMap() to fetch their content in a separate request and merge it's result into the operator chain.

This plnkr example prints to console:

{ id: 1, desc: 'description 1' }
{ id: 2, desc: 'description 2' }
{ id: 3, desc: 'description 3' }


回答2:

The problem you likely encountered is that you did not flatten enough.

flatMap or mergeMap will flatten Observables, Promises, Arrays, even generators (don't quote me on that last one), just about anything you want to throw at it.

So when you do .flatMap(items => items.map(item => this.fetchItem(item)), you are really just doing Observable<Array<Item>> => Observable<Observable<Item>>

When you just do map you are doing Observable<Array<Item>> => Observable<Array<Observable<Item>>>.

What you need to do is first flatten out the Array and then flatten out each request:

class ItemsService {
  fetchItems() {
    return this.http.get(url)
    .map(res => res.json())
    // Implicitly map Array into Observable and flatten it
    .flatMap(items => items)
    // Flatten the response from each item
    .flatMap((item: Item) => this.fetchItem(item));
  }
}

Now the above works if you don't mind getting each item response individually. If you need to get all of the items then you should use forkJoin on all the inner values, but you would still need flatMap in order to flatten the resulting inner value:

fetchItems(): Observable<Response[]> {
  return this.http.get(url)
    .map(res => res.json())
    .flatMap(items => {
       const requests = items.map(item => this.fetchItem(item));
       return Rx.Observable.forkJoin(requests);
    });
}


回答3:

You can break up the items array before the line that calls this.fetchItem. You can use mergeMap on an Observable whose value is an array and each item will be emitted individually.

fetchItems() {
    return this.http.get(url)
       .map(res => res.json())
       .mergeMap(arrItem => this.fetchItem(arrItem));
}

Edit: I guess I should have provided more explanation. mergeMap is synonymous with flatMap in rxjs. Typically, you use flatMap when your projection function returns an Observable, but it will also flatten arrays as well, so, calling mergeMap will then emit each item individually, I thought that was what OP wanted to achieve. I also realized, you can combine the mergeMap call and last map call, because the projection for mergeMap will be called for each item in the array, I made the changes in the code above.