Observable concat method on array returns the same

2019-07-24 17:21发布

In my service I try to solve the next problem. I have a json file with the names of other json files (cards):

{
  "filename1" : ... ,
  "filename2" : ... ,
  ...
  "filenameN" : ...
}

I need to load all that files. "filenameX" files have some card data :

{
  dataX
}

I need to combine loaded data in object:

{
  "filename1" : { data1 },
  "filename2" : { data2 },
...
  "filenameN" : { dataN }
}

I create Observer for any file loading and try to combine them to a high-level single Observer which is resolved when all corresponding Observers are. Here is my code:

import { Observable } from "rxjs/Observable";
import "rxjs/add/observable/concat";
import "rxjs/add/operator/map";
...
    _loadCards(dir, cardsListFile) {
        var http = ...
        var url = ...
        return Observable.create(function (observer) {
            http.get(url + dir + "/" + cardsListFile + ".json").map(res => { return res.json(); }).subscribe(list => {
                let data = {};
                let observers = [];
                for(var card in list) {
                    if(list.hasOwnProperty(card)) {
                        let obs = http.get(url + dir + "/cards/" + card + ".json").map(res => { return res.json(); });
                        observers.push(obs);
                        let getData = function(card) {
                            obs.subscribe(cardData => {
                                data[card] = cardData;
                            });
                        };
                        getData(card);
                    }
                }

                let concatResult = Observable.concat(observers);
                console.log(concatResult);
                concatResult.subscribe(result => {
                    observer.onNext(data);
                    observer.onCompleted();
                });         
            });
        });
    };

However, concat operator of Observer doesn't work as described - it returns the same array of Observers as its input. Where is the problem and what other operators can I use to make my solution more straight forward (because now it's defenitely ugly)?

3条回答
Anthone
2楼-- · 2019-07-24 17:47

I'm not completely sure I understand what you want to do but the concat() operator takes its parameters unpacked and not as an array.

This means that Observable.concat(observables) will just reemit the Observables as they are in the observables array.

What you want instead is pass the array unpacked Observable.concat(...observables) which is the same as:

Observable.concat(observable1, observable2, observable3 ,...)

I think this simulates what you're trying to do. It's uses forkJoin instead of concat so it waits until all of the Observables complete.

var input = {
  "filename1": 'data1',
  "filename2": 'data2',
  "filenameN": 'dataN'
};

let keys = Object.keys(input);

let observables = keys.map(key => {
  return Observable.of(input[key]).map(s => s.split("").reverse().join(""));
});

Observable.forkJoin(observables, (...results) => {
    let combined = {};
    keys.forEach((key, i) => {
      combined[key] = results[i];
    });
    return combined;
  })
  .subscribe(val => console.log(val));

This prints to console:

{ filename1: '1atad', filename2: '2atad', filenameN: 'Natad' }

See live demo: https://jsbin.com/coruyu/4/edit?js,console

查看更多
爷、活的狠高调
3楼-- · 2019-07-24 17:49

JSBIN with solution: http://jsbin.com/dahaqif/edit?js,console

I'm assuming you have two observables, one to get the list of filenames, the other to get the data for a specific filename:

const filenamesObs = Observable.of({
  "filename1" : null,
  "filename2" : null,
  "filename3" : null
});
// This is actually a function that returns an observable.
const filedataObs = (filename) => {
  return Observable.of(`This is the data for ${filename}`);
}

Now, here's how you can combine the two observables to obtain the data structure you described:

const source = filenamesObs

  // Extract the list of filenames as an array.
  .map(obj => Object.keys(obj))

  // Flatten the array, i.e. emit each filename individually
  // vs a SINGLE array containing ALL filenames.
  .mergeMap(val => val)

  // Get the data for each filename.
  .mergeMap(filename =>
    filedataObs(filename).map(data => Object.assign({}, { filename: filename, data: data }))
  )

  // At this point, you have a stream of { filename, data } objects.
  // Reduce everything back to a single object.
  .reduce((acc, curr) => {
    acc[curr.filename] = curr.data;
    return acc;
  }, {});

If you subscribe and log the result to the console, you'll see:

[Object] {
  filename1: "This is the data for filename1",
  filename2: "This is the data for filename2",
  filename3: "This is the data for filename3"
}

Additional clarifications following Arseniy's comment

  • The first mergeMap() is a "trick" to transform the SINGLE array of values we have at the beginning into MULTIPLE, INDIVIDUAL values. Since you want to be able to process each filename individually (to fetch the corresponding data), it's more convenient to receive the filenames one by one vs as a big array containing all filenames. To make it crystal clear, you should go to my JSBIN and add the line .do(console.log) before and after the first mergeMap(). You'll immediately understand the difference.
  • The second mergeMap() "projects" the values of a source observable into a target observable. It sounds fancy but it simply means we are transforming the filenames (1st observable) into HTTP requests to get the files' data (2nd observable).
  • Finally, Object.assign() lets met put everything back together. Since we have data coming from two observables — the one emitting the filenames and the one emitting the file data — we need to merge everything into a single container before we can use it (or transform it further). For this I'm using Object.assign() to create a temporary object with two properties, filename and data. Note that is vanilla JavaScript, it has nothing to do with observables. Again, you can add a .do(console.log) line after the second mergeMap() to print out to the console what the stream contains at this point.
查看更多
【Aperson】
4楼-- · 2019-07-24 18:02

Rule #1: Never use custom subscriptions inside other subscriptions - there is maybe 0.1% of cases where this is really needed.

I've rewritten your stream a little how it could be done:

_loadCards(dir, cardsListFile) {
    var http = ...
    var url = ...
    return http.get(url + dir + "/" + cardsListFile + ".json")
        .map(res => res.json())
        .switchMap(fileMap => Rx.Observable.from(Object.keys(fileMap)))
        .concatMap(card => http.get(url + dir + "/cards/" + card + ".json")
               .map(res => ({[card]: res.json()})))
        .map(arrayOfCardResultObjects => Object.assign.apply({}, arrayOfCardResultObjects));
};

An adjusted version of this to run live as a snippet:

getMockedCardMap()
    .switchMap(fileMap => Rx.Observable.from(Object.keys(fileMap)))
    .concatMap(card => getMockedCardFromFile(card).map(res => ({[card]: res})))
    .toArray()
    .map(arrayOfCardResultObjects => Object.assign.apply({}, arrayOfCardResultObjects))
    .subscribe(console.log);


// Mocked helper-methods
function getMockedCardMap() {
    return Rx.Observable.of({
        "file1": "foo",
        "file2": "bar",
        "file3": "baz"
    });
}

function getMockedCardFromFile(file) {
    return Rx.Observable.of("Result from: " + file);
}
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

查看更多
登录 后发表回答