Using highland.js to perform async tasks in series

2019-04-13 02:15发布

问题:

I have a stream of events:

var eventStream = _([{
  id: 1, foo: 'bar'
}, {
  id: 2, foo: 'baz'
}]);

For each event in the stream I need to load an instance of a model (my DAL returns promises) and then call a method on each instance of the model, passing some data from the original event data as an argument.

Loading instances of the model was not too difficult:

eventStream.map(function(data) {
    return getModelPromise(data.id);
}).map(_).merge(); // result is a stream of model instances

But once I have the model, I can't figure out how to invoke a method on the model and pass data.foo to it. Basically, for each instance I need to do:

modelInstance.doStuff(data.foo);

I've played with forking the stream, pulling models on the fork and then using zip and invoke in different combinations, but I haven't had any luck. With async I would have handled this pretty simply through proper user of closures. How can I accomplish this with streams using highland.js?

回答1:

Simplest thing to do might be to wrap getModelPromise so that it returns a promise resolving to an object with your model and your data as properties instead of just your model.

Or, if you don't want to use a promise you can do it in Highland:

var modelStream = eventStream.map(function (data) {
    return _(getModelPromise(data.id)).map(function (model) {
        return {data: data, model: model};
    });
}).parallel(10);

// then...
modelStream.map(function (x) {
    x.model.doStuff(x.data.foo);
});

Zipping the modelStream and an observed version of the eventStream should also work, but I usually prefer to pass around objects which contain everything you need.