I have a stream of events:
var eventStream = _([{
id: 1, foo: 'bar'
}, {
id: 2, foo: 'baz'
}]);
For each event in the stream I need to load an instance of a model (my DAL returns promises) and then call a method on each instance of the model, passing some data from the original event data as an argument.
Loading instances of the model was not too difficult:
eventStream.map(function(data) {
return getModelPromise(data.id);
}).map(_).merge(); // result is a stream of model instances
But once I have the model, I can't figure out how to invoke a method on the model and pass data.foo
to it. Basically, for each instance I need to do:
modelInstance.doStuff(data.foo);
I've played with forking the stream, pulling models on the fork and then using zip
and invoke
in different combinations, but I haven't had any luck. With async I would have handled this pretty simply through proper user of closures. How can I accomplish this with streams using highland.js?