I understand how to use writable streams in Node's new Streams2
library, but I don't understand how to use readable streams.
Take, for example, a stream wrapper around the dgram
module:
var dgram = require('dgram');
var thumbs = {
twiddle: function() {}
};
var defaults = {
address: '0.0.0.0',
type: 'udp4',
port: 12345,
broadcast: null,
multicast: null,
multicastTTL: 1
};
var UDPStream = function(options) {
if (!(this instanceof UDPStream))
return new UDPStream(options);
Duplex.call(this);
options = options || {};
this.address = options.address || defaults.address;
this.type = options.type || defaults.type;
this.port = options.port || defaults.port;
this.broadcast = options.broadcast || defaults.broadcast;
this.multicast = options.multicast || defaults.multicast;
this.multicastTTL = options.multicastTTL || defaults.multicastTTL;
this._socket = dgram.createSocket(this.type, setup.bind(this));
this._socket.on('message', this.push.bind(this));
};
util.inherits(UDPStream, Duplex);
var setup = function() {
if (this.multicast) {
this._socket.addMembership(this.multicast);
this._socket.setMulticastTTL(this.multicastTTL);
this.destination = this.multicast;
} else {
// default to using broadcast if multicast address is not specified.
this._socket.setBroadcast(true);
// TODO: get the default broadcast address from os.networkInterfaces() (not currently returned)
this.destination = this.broadcast || '255.255.255.255';
}
};
UDPStream.prototype._read = function(size) {
thumbs.twiddle();
};
UDPStream.prototype._write = function(chunk, encoding, callback) {
this._socket.send(chunk, 0, chunk.length, this.port, this.destination);
callback();
};
module.exports = UDPStream;
Everything makes sense except for the _read
implementation. It's literally twiddling thumbs because I don't understand what I'm supposed to do there. My data is pushed when the udp socket emits a new message, but I have no way of pausing or resuming the underlying resource. What should this look like?
_read is part of the pause resume mechanism. From the NodeJS API docs
So in your _write function, if the
socket.send
call fails by either returning false or calling a callback with an error you should pause your stream._read
then can simple dothis._paused = false
Might look like this.
The answer is fairly straightforward: if there is truly no way to apply backpressure to your underlying resource, your
_read
implementation is simply empty. The stream will take care of queueing your pushed data until it hits thehighWaterMark
, but guarantees nothing beyond that point. The docs say that you should "simply provide data whenever it becomes available."