Combining multiple FirebaseListObservables

2019-08-02 10:28发布

问题:

const placeId = this.getPlaceId();
        this.af.database.list(`placeUsers/${placeId}`).subscribe((userKeys) => {
            for (let index = 0; index < userKeys.length; index++) {
                let userKey = userKeys[index];

                this.af.database.list(`userDevices/${userKey.$key}`).subscribe((deviceKeys) => {

                    for (let index = 0; index < deviceKeys.length; index++) {
                        let deviceKey = deviceKeys[index];

                        this.af.database.object(`devices/${deviceKey.$key}`).subscribe((device) => {

                            console.log(device);
                            // Device received.    

                        });
                    }
                });
            }
        });

I'm currently trying to send notifications to all my users that are following a place. The current flow goes like this:

  • Getting the users that belong to a place (placeUsers node)
  • Getting the deviceKeys that belong to the users (userDevices node)
  • Getting the devices from the deviceKeys (devices node)

I was wondering if there was a way to combine all these calls into a single observable call.

What my current problem is, is that i am unable to know when all of these requests are done. I've looked into RxJs, which would allow me to combine all these observables. But i haven't found a good solution on how to do it with four nodes.

回答1:

You can use concatMap and forkJoin to compose an observable that emits the devices. This composed observable will emit a single array of devices and will then complete (as the first operator is used to take only the first emitted list or object):

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/concatMap';
import 'rxjs/add/operator/first';
import 'rxjs/add/operator/forkJoin';

this.af.database
  .list(`placeUsers/${placeId}`)
  .first()
  .concatMap(userKeys => {
    let observables = userKeys.map(userKey => this.af.database
      .list(`userDevices/${userKey.$key}`)
      .first()
    );
    return observables.length ?
      Observable.forkJoin(...observables, (...lists) => [].concat(...lists)) :
      Observable.of([])
  })
  .concatMap(deviceKeys => {
    let observables = deviceKeys.map(deviceKeys => this.af.database
      .object(`devices/${deviceKey.$key}`)
      .first()
    );
    return observables.length ?
      Observable.forkJoin(...observables) :
      Observable.of([])
  })
  .subscribe(devices => console.log(devices));

If you want an observable that does not complete and emits the devices for a place whenever the users for a places or their devices change, use switchMap instead of concatMap, combineLatest instead of forkJoin and remove the first operators:

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/combineLatest';
import 'rxjs/add/operator/switchMap';

this.af.database
  .list(`placeUsers/${placeId}`)
  .switchMap(userKeys => {
    let observables = userKeys.map(userKey => this.af.database
      .list(`userDevices/${userKey.$key}`)
    );
    return observables.length ?
      Observable.combineLatest(...observables, (...lists) => [].concat(...lists)) :
      Observable.of([])
  })
  .switchMap(deviceKeys => {
    let observables = deviceKeys.map(deviceKeys => this.af.database
      .object(`devices/${deviceKey.$key}`)
    );
    return observables.length ?
      Observable.combineLatest(...observables) :
      Observable.of([])
  })
  .subscribe(devices => console.log(devices));