Node.js Piping the same readable stream into multi

2019-01-04 09:42发布

I need to run two commands in series that need to read data from the same stream. After piping a stream into another the buffer is emptied so i can't read data from that stream again so this doesn't work:

var spawn = require('child_process').spawn;
var fs = require('fs');
var request = require('request');

var inputStream = request('http://placehold.it/640x360');
var identify = spawn('identify',['-']);

inputStream.pipe(identify.stdin);

var chunks = [];
identify.stdout.on('data',function(chunk) {
  chunks.push(chunk);
});

identify.stdout.on('end',function() {
  var size = getSize(Buffer.concat(chunks)); //width
  var convert = spawn('convert',['-','-scale',size * 0.5,'png:-']);
  inputStream.pipe(convert.stdin);
  convert.stdout.pipe(fs.createWriteStream('half.png'));
});

function getSize(buffer){
  return parseInt(buffer.toString().split(' ')[2].split('x')[0]);
}

Request complains about this

Error: You cannot pipe after data has been emitted from the response.

and changing the inputStream to fs.createWriteStream yields the same issue of course. I don't want to write into a file but reuse in some way the stream that request produces (or any other for that matter).

Is there a way to reuse a readable stream once it finishes piping? What would be the best way to accomplish something like the above example?

5条回答
放我归山
2楼-- · 2019-01-04 10:16

What about piping into two or more streams not at the same time ?

For example :

var PassThrough = require('stream').PassThrough;
var mybiraryStream = stream.start(); //never ending audio stream
var file1 = fs.createWriteStream('file1.wav',{encoding:'binary'})
var file2 = fs.createWriteStream('file2.wav',{encoding:'binary'})
var mypass = PassThrough
mybinaryStream.pipe(mypass)
mypass.pipe(file1)
setTimeout(function(){
   mypass.pipe(file2);
},2000)

The above code does not produce any errors but the file2 is empty

查看更多
混吃等死
3楼-- · 2019-01-04 10:19

You have to create duplicate of the stream by piping it to two streams. You can create a simple stream with a PassThrough stream, it simply passes the input to the output.

const spawn = require('child_process').spawn;
const PassThrough = require('stream').PassThrough;

const a = spawn('echo', ['hi user']);
const b = new PassThrough();
const c = new PassThrough();

a.stdout.pipe(b);
a.stdout.pipe(c);

let count = 0;
b.on('data', function (chunk) {
  count += chunk.length;
});
b.on('end', function () {
  console.log(count);
  c.pipe(process.stdout);
});

Output:

8
hi user
查看更多
你好瞎i
4楼-- · 2019-01-04 10:22

I have a different solution to write to two streams simultaneously, naturally, the time to write will be the addition of the two times, but I use it to respond to a download request, where I want to keep a copy of the downloaded file on my server (actually I use a S3 backup, so I cache the most used files locally to avoid multiple file transfers)

/**
 * A utility class made to write to a file while answering a file download request
 */
class TwoOutputStreams {
  constructor(streamOne, streamTwo) {
    this.streamOne = streamOne
    this.streamTwo = streamTwo
  }

  setHeader(header, value) {
    if (this.streamOne.setHeader)
      this.streamOne.setHeader(header, value)
    if (this.streamTwo.setHeader)
      this.streamTwo.setHeader(header, value)
  }

  write(chunk) {
    this.streamOne.write(chunk)
    this.streamTwo.write(chunk)
  }

  end() {
    this.streamOne.end()
    this.streamTwo.end()
  }
}

You can then use this as a regular OutputStream

const twoStreamsOut = new TwoOutputStreams(fileOut, responseStream)

and pass it to to your method as if it was a response or a fileOutputStream

查看更多
迷人小祖宗
5楼-- · 2019-01-04 10:27

For general problem, the following code works fine

var PassThrough = require('stream').PassThrough
a=PassThrough()
b1=PassThrough()
b2=PassThrough()
a.pipe(b1)
a.pipe(b2)
b1.on('data', function(data) {
  console.log('b1:', data.toString())
})
b2.on('data', function(data) {
  console.log('b2:', data.toString())
})
a.write('text')
查看更多
做自己的国王
6楼-- · 2019-01-04 10:30

The first answer only works if streams take roughly the same amount of time to process data. If one takes significantly longer, the faster one will request new data, consequently overwriting the data still being used by the slower one (I had this problem after trying to solve it using a duplicate stream).

The following pattern worked very well for me. It uses a library based on Stream2 streams, Streamz, and Promises to synchronize async streams via a callback. Using the familiar example from the first answer:

spawn = require('child_process').spawn;
pass = require('stream').PassThrough;
streamz = require('streamz').PassThrough;
var Promise = require('bluebird');

a = spawn('echo', ['hi user']);
b = new pass;
c = new pass;   

a.stdout.pipe(streamz(combineStreamOperations)); 

function combineStreamOperations(data, next){
  Promise.join(b, c, function(b, c){ //perform n operations on the same data
  next(); //request more
}

count = 0;
b.on('data', function(chunk) { count += chunk.length; });
b.on('end', function() { console.log(count); c.pipe(process.stdout); });
查看更多
登录 后发表回答