Streams read(0) instruction

2019-05-10 16:59发布

问题:

There is a code I found here https://github.com/substack/stream-handbook which reads 3 bytes from stream. And I do not understand how it works.

process.stdin.on('readable', function() {
    var buf = process.stdin.read(3);
    console.log(buf);
    process.stdin.read(0);
});

Being called like this:

(echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume.js

It returns:

<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
<Buffer 68 69 0a>

First of all, why do I need this .read(0) thing? Isn't stream has a buffer where the rest of data is stored until I request it by .read(size)? But without .read(0) it'll print

<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>

Why?

The second is these sleep 1 instructions. If I call the script without it

(echo abc; echo def; echo ghi) | node consume.js

It'll print

<Buffer 61 62 63>
<Buffer 0a 64 65>

no matter will I use .read(0) or not. I don't understand this completely. What logic is used here to print such a result?

回答1:

I am not sure about what exactly the author of https://github.com/substack/stream-handbook tried to show using the read(0) approach, but IMHO this is the correct approach:

process.stdin.on('readable', function () {
  let buf;
  // Every time when the stream becomes readable (it can happen many times), 
  // read all available data from it's internal buffer in chunks of any necessary size.
  while (null !== (buf = process.stdin.read(3))) {
    console.dir(buf);
  }
});

You can change the chunk size, pass the input either with sleep or without it...



回答2:

I happened to learn NodeJS stream module these days. Here are some comments inside Readable.prototype.read function:

// if we're doing read(0) to trigger a readable event, but we
// already have a bunch of data in the buffer, then just trigger
// the 'readable' event and move on.

It said that after called .read(0), stream would just trigger (using the process.nextTick) another readable event if stream was not ended.

function emitReadable(stream) {
  // ...
  process.nextTick(emitReadable_, stream);
  // ...
}