Observable.forkJoin() doesn't execute

2019-02-16 11:10发布

问题:

I have the following code:

//Loop: For each user ID/Role ID, get the data
userMeta.forEach((businessRole) => {
  Observable.forkJoin(
    af.database.object('/roles/'+businessRole.$value),
    af.database.object('/users/'+businessRole.$key)
  ).subscribe(
    data => {
      console.log("Data received");
      data[1].role = data[0];
      this.users.push(data[1]);
    },
    err => console.error(err)
  );

I am trying to subscribe to a result of 2 observables using forkJoin.

For some reasons, the "Data received" message is not shown.

My userMeta variables looks fine at console.log:

What's wrong?

Update: the following code does not return anything either

let source = Observable.forkJoin(
        af.database.object('/roles/'+businessRole.$value),
        af.database.object('/users/'+businessRole.$key)
    );
    let subscription = source.subscribe(
      function (x) {
    console.log("GOT: " + x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

What I actually trying to do is improve the performance of the following code:

//Subscription 3: role ID to role Name
        af.database.object('/roles/'+businessRole.$value)
        .subscribe((roleData) => {
        //Subscription 4: Get user info
        af.database.object('/users/'+businessRole.$key).subscribe(user => {

回答1:

forkJoin() requires all source Observables to emit at least once and to complete.

This following demo completes as expected:

const source = forkJoin(
  from([1,2,3]),
  from([9,8,7,6])
).subscribe(
  x => console.log('GOT:', x),
  err => console.log('Error:', err),
  () => console.log('Completed')
);

Live demo: https://stackblitz.com/edit/rxjs-urhkni

GOT: 3,6
Completed

Jan 2019: Updated for RxJS 6



回答2:

I had a similar issue using Angular 2 / Angularfire 2, specifically where I was looking up whether users existed by email. In one case, the user exists and I received an array of one object from the Observable. In the other case, the user did not exist, and I received an empty array.

When I used forkJoin with a resultSelector and a subscribe, neither the resultSelector nor the subscribe function ever ran. However, when I tried

Observable.zip(
  FirebaseListObservable,
  FirebaseListObservable,
  (...results) => {
    return results.map(some code here)
  }
).subscribe(res => console.log(res));

Both the selector and the subscribe worked. I assume this has to do with @martin's answer, where forkJoin requires the observables to complete, because by definition it returns the the last emissions. If an observable never completes, I suppose it can never have a last emission.

Perhaps angularfire list observables (or object observables in your case) never complete, making the use of forkJoin impossible. Fortunately zip has similar behavior and still works, the difference being that it can repeat several times if the data changes in Firebase, where as forkJoin only combines the last response.

In my case, I'm looking at either 1) using zip and accepting that my code might run multiple times if user data changes while the .zip is still running, 2) manually disable the zip after the first set of data returns, or 3) ditch Angularfire and try out the Firebase api directly, using something like .once to see if I can get an observable that completes and triggers forkJoin.



回答3:

I've faced a similar issue: I was creating a list of observables dynamically and I've noticed that forkjoin() never emits nor completes if the list of observables is empty whereas Promise.all() resolves with an empty list :

Observable.forkJoin([])
    .subscribe(() => console.log('do something here')); // This is never called

The workaround I've found is to check the length of the list and to not use this operator when it is empty.

return jobList.length ? Observable.forkJoin(jobList) : Observable.of([]);