Node.js Readable Stream _read Usage

2019-05-09 22:17发布

问题:

I understand how to use writable streams in Node's new Streams2 library, but I don't understand how to use readable streams.

Take, for example, a stream wrapper around the dgram module:

var dgram = require('dgram');

var thumbs = {
  twiddle: function() {}
};

var defaults = {
  address: '0.0.0.0',
  type: 'udp4',
  port: 12345,
  broadcast: null,
  multicast: null,
  multicastTTL: 1
};

var UDPStream = function(options) {
  if (!(this instanceof UDPStream))
    return new UDPStream(options);

  Duplex.call(this);

  options = options || {};

  this.address = options.address || defaults.address;
  this.type = options.type || defaults.type;
  this.port = options.port || defaults.port;
  this.broadcast = options.broadcast || defaults.broadcast;
  this.multicast = options.multicast || defaults.multicast;
  this.multicastTTL = options.multicastTTL || defaults.multicastTTL;

  this._socket = dgram.createSocket(this.type, setup.bind(this));
  this._socket.on('message', this.push.bind(this));
};

util.inherits(UDPStream, Duplex);

var setup = function() {
  if (this.multicast) {
    this._socket.addMembership(this.multicast);
    this._socket.setMulticastTTL(this.multicastTTL);

    this.destination = this.multicast;
  } else {
    // default to using broadcast if multicast address is not specified.
    this._socket.setBroadcast(true);

    // TODO: get the default broadcast address from os.networkInterfaces() (not currently returned)
    this.destination = this.broadcast || '255.255.255.255';
  }
};

UDPStream.prototype._read = function(size) {
  thumbs.twiddle();
};

UDPStream.prototype._write = function(chunk, encoding, callback) {
  this._socket.send(chunk, 0, chunk.length, this.port, this.destination);
  callback();
};

module.exports = UDPStream;

Everything makes sense except for the _read implementation. It's literally twiddling thumbs because I don't understand what I'm supposed to do there. My data is pushed when the udp socket emits a new message, but I have no way of pausing or resuming the underlying resource. What should this look like?

回答1:

_read is part of the pause resume mechanism. From the NodeJS API docs

When data is available, put it into the read queue by calling readable.push(chunk). If push returns false, then you should stop reading. When _read is called again, you should start pushing more data.

So in your _write function, if the socket.send call fails by either returning false or calling a callback with an error you should pause your stream. _read then can simple do this._paused = false

Might look like this.

UDPStream.prototype._read = function() {
  this._paused = false;
}

UDPStream.prototype._write = function(chunk, encoding, callback) {
  if(!this._paused)
   this._socket.send(chunk, 0, chunk.length, this.port, this.destination);
};


回答2:

The answer is fairly straightforward: if there is truly no way to apply backpressure to your underlying resource, your _read implementation is simply empty. The stream will take care of queueing your pushed data until it hits the highWaterMark, but guarantees nothing beyond that point. The docs say that you should "simply provide data whenever it becomes available."