How to use drain event of stream.Writable in Node.

2019-01-24 05:57发布

问题:

In Node.js I'm using the fs.createWriteStream method to append data to a local file. In the Node documentation they mention the drain event when using fs.createWriteStream, but I don't understand it.

var stream = fs.createWriteStream('fileName.txt');
var result = stream.write(data);

In the code above, how can I use the drain event? Is the event used properly below?

var data = 'this is my data';
if (!streamExists) {
  var stream = fs.createWriteStream('fileName.txt');
}

var result = stream.write(data);
if (!result) {
  stream.once('drain', function() {
    stream.write(data);
  });
}

回答1:

The drain event is for when a writable stream's internal buffer has been emptied.

This can only happen when the size of the internal buffer once exceeded its highWaterMark property, which is the maximum bytes of data that can be stored inside a writable stream's internal buffer until it stops reading from the data source.

The cause of something like this can be due to setups that involve reading a data source from one stream faster than it can be written to another resource. For example, take two streams:

var fs = require('fs');

var read = fs.createReadStream('./read');
var write = fs.createWriteStream('./write');

Now imagine that the file read is on a SSD and can read at 500MB/s and write is on a HDD that can only write at 150MB/s. The write stream will not be able to keep up, and will start storing data in the internal buffer. Once the buffer has reached the highWaterMark, which is by default 16KB, the writes will start returning false, and the stream will internally queue a drain. Once the internal buffer's length is 0, then the drain event is fired.

This is how a drain works:

if (state.length === 0 && state.needDrain) {
  state.needDrain = false;
  stream.emit('drain');
}

And these are the prerequisites for a drain which are part of the writeOrBuffer function:

var ret = state.length < state.highWaterMark;
state.needDrain = !ret;

To see how the drain event is used, take the example from the Node.js documentation.

function writeOneMillionTimes(writer, data, encoding, callback) {
  var i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i -= 1;
      if (i === 0) {
        // last time!
        writer.write(data, encoding, callback);
      } else {
        // see if we should continue, or wait
        // don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // had to stop early!
      // write some more once it drains
      writer.once('drain', write);
    }
  }
}

The function's objective is to write 1,000,000 times to a writable stream. What happens is a variable ok is set to true, and a loop only executes when ok is true. For each loop iteration, the value of ok is set to the value of stream.write(), which will return false if a drain is required. If ok becomes false, then the event handler for drain waits, and on fire, resumes the writing.


Regarding your code specifically, you don't need to use the drain event because you are writing only once right after opening your stream. Since you have not yet written anything to the stream, the internal buffer is empty, and you would have to be writing at least 16KB in chunks in order for the drain event to fire. The drain event is for writing many times with more data than the highWaterMark setting of your writable stream.



回答2:

Imagine you're connecting 2 streams with very different bandwidths, say, uploading a local file to a slow server. The (fast) file stream will emit data faster than the (slow) socket stream can consume it.

In this situation, node.js will keep data in memory until the slow stream gets a chance to process it. This can get problematic if the file is very large.

To avoid this, Stream.write returns false when the underlying system buffer is full. If you stop writing, the stream will later emit a drain event to indicate that the system buffer has emptied and it is appropriate to write again.

You can use pause/resume the readable stream and control the bandwidth of the readable stream.

Better: you can use readable.pipe(writable) which will do this for you.

EDIT: There's a bug in your code: regardless of what write returns, your data has been written. You don't need to retry it. In your case, you're writing data twice.

Something like this would work:

var packets = […],
    current = -1;

function niceWrite() {
  current += 1;

  if (current === packets.length)
    return stream.end();

  var nextPacket = packets[current],
      canContinue = stream.write(nextPacket);

  // wait until stream drains to continue
  if (!canContinue)
    stream.once('drain', niceWrite);
  else
    niceWrite();
}