Using stream as input to an async queue in Node.js

2019-07-24 20:26发布

问题:

I will be reading a series of inputs from a stream, and perform a HTTP GET request per input. To avoid creating too many connections at a time, I am using async.queue to queue up these inputs.

After all inputs are read (end emitted to the stream), I would like to collect previous results and generate an overview.

I am currently using queue.drain for this purpose. But queue.drain may be called multiple times in my case, since the process could be blocked on input and the queue will be empty when this happens.

So, is there anything in the async library that ensures queue.drain is called only once when dealing with streams? I don't mind switching to another control flow as long as this functionality could be realized.

回答1:

Just add the drain handler when you receive the end event on your readable stream.

var s = new SomeReadableStream();
var q = queue(your_callback, 1);

s.on('end', function() {
  // Beware: if the queue is already empty, the drain callback will never be called,
  // we have to check this by ourselves
  if (q.running() === 0 && q.length() === 0) {
    drain_cb();
  }
  else {
    q.drain = drain_cb;
  }
});