RxJS Map array to observable and back to plain obj

2019-06-24 19:11发布

问题:

I have an array of objects from which I need to pass each object separately into async method (process behind is handled with Promise and then converted back to Observable via Observable.fromPromise(...) - this way is needed because the same method is used in case just single object is passed anytime; the process is saving objects into database). For example, this is an array of objects:

[
  {
    "name": "John",
    ...
  },
  {
    "name": "Anna",
    ...
  },
  {
    "name": "Joe",,
    ...
  },
  {
    "name": "Alexandra",
    ...
  },
  ...
]

Now I have the method called insert which which inserts object into database. The store method from database instance returns newly created id. At the end the initial object is copied and mapped with its new id:

insert(user: User): Observable<User> {
  return Observable.fromPromise(this.database.store(user)).map(
    id => {
      let storedUser = Object.assign({}, user);
      storedUser.id = id;
      return storedUser;
    }
  );
}

This works well in case I insert single object. However, I would like to add support for inserting multiple objects which just call the method for single insert. Currently this is what I have, but it doesn't work:

insertAll(users: User[]): Observable<User[]> {
  return Observable.forkJoin(
    users.map(user => this.insert(user))
  );
}

The insertAll method is inserting users as expected (or something else filled up the database with that users), but I don't get any response back from it. I was debugging what is happening and seems that forkJoin is getting response just from first mapped user, but others are ignored. Subscription to insertAll does not do anything, also there is no any error either via catch on insertAll or via second parameter in subscribe to insertAll.

So I'm looking for a solution where the Observable (in insertAll) would emit back an array of new objects with users in that form:

[
  {
    "id": 1,
    "name": "John",
    ...
  },
  {
    "id": 2,
    "name": "Anna",
    ...
  },
  {
    "id": 3,
    "name": "Joe",,
    ...
  },
  {
    "id": 4,
    "name": "Alexandra",
    ...
  },
  ...
]

I would be very happy for any suggestion pointing in the right direction. Thanks in advance!

回答1:

To convert from array to observable you can use Rx.Observable.from(array).

To convert from observable to array, use obs.toArray(). Notice this does return an observable of an array, so you still need to .subscribe(arr => ...) to get it out.

That said, your code with forkJoin does look correct. But if you do want to try from, write the code like this:

insertAll(users: User[]): Observable<User[]> {
  return Observable.from(users)
    .mergeMap(user => this.insert(user))
    .toArray()
  );
}

Another more rx like way to do this would be to emit values as they complete, and not wait for all of them like forkJoin or toArray does. We can just omit the toArray from the previous example and we got it:

insertAll(users: User[]): Observable<User> {
  return Observable.from(users)
    .mergeMap(user => this.insert(user))
  );
}

As @cartant mentioned, the problem might not be in Rx, it might be your database does not support multiple connections. In that case, you can replace the mergeMap with concatMap to make Rx send only 1 concurrent request:

insertAll(users: User[]): Observable<User[]> {
  return Observable.from(users)
    .concatMap(user => this.insert(user))
    .toArray() // still optional
  );
}