连接两个(或n)的流(Concatenate two (or n) streams)

2019-09-02 05:10发布

  • 2流:

    鉴于可读流 stream1stream2 ,什么是地道(简洁)的方式来获得含流stream1stream2级联

    我不能做stream1.pipe(outStream); stream2.pipe(outStream) stream1.pipe(outStream); stream2.pipe(outStream)因为这时的流内容被混在一起。

  • N个流:

    给定一个EventEmitter发射流数量不确定的,如

     eventEmitter.emit('stream', stream1) eventEmitter.emit('stream', stream2) eventEmitter.emit('stream', stream3) ... eventEmitter.emit('end') 

    什么是一个地道的(简洁)的方式来获得与连接在一起的所有流的流

Answer 1:

该合并的流包会连接流。 从自述例如:

var CombinedStream = require('combined-stream');
var fs = require('fs');

var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

combinedStream.pipe(fs.createWriteStream('combined.txt'));

我相信你有所有的流一次追加。 如果队列中运行,空, combinedStream自动结束。 参见问题#5 。

该流流库是一个具有明确的替代.end ,但它更受欢迎,想必没有得到很好的测试。 它使用节点0.10的streams2 API(见讨论 )。



Answer 2:

这可以用香草来的NodeJS完成

import { PassThrough } from 'stream'
const merge = (...streams) => {
    let pass = new PassThrough()
    let waiting = streams.length
    for (let stream of streams) {
        pass = stream.pipe(pass, {end: false})
        stream.once('end', () => --waiting === 0 && pass.emit('end'))
    }
    return pass
}


Answer 3:

一个简单的减少操作要精细的NodeJS !

const {PassThrough} = require('stream')

let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => {
  s.pipe(pt, {end: false})
  s.once('end', () => a.every(s => s.ended) && pt.emit('end'))
  return pt
}, new PassThrough())

干杯;)



Answer 4:

你也许可以使它更加简洁,但这里有一个工程:

var util = require('util');
var EventEmitter = require('events').EventEmitter;

function ConcatStream(streamStream) {
  EventEmitter.call(this);
  var isStreaming = false,
    streamsEnded = false,
    that = this;

  var streams = [];
  streamStream.on('stream', function(stream){
    stream.pause();
    streams.push(stream);
    ensureState();
  });

  streamStream.on('end', function() {
    streamsEnded = true;
    ensureState();
  });

  var ensureState = function() {
    if(isStreaming) return;
    if(streams.length == 0) {
      if(streamsEnded)
        that.emit('end');
      return;
    }
    isStreaming = true;
    streams[0].on('data', onData);
    streams[0].on('end', onEnd);
    streams[0].resume();
  };

  var onData = function(data) {
    that.emit('data', data);
  };

  var onEnd = function() {
    isStreaming = false;
    streams[0].removeAllListeners('data');
    streams[0].removeAllListeners('end');
    streams.shift();
    ensureState();
  };
}

util.inherits(ConcatStream, EventEmitter);

我们保持与轨道状态的streams (流的队列; push到后面,并shift从前面), isStreamingstreamsEnded 。 当我们得到一个新的数据流,我们按下时,当流结束后,我们停止听和移动它。 当流的流结束,我们设置streamsEnded

在这些事件中,我们检查我们的状态。如果我们已经流(管道流),我们什么也不做。 如果队列为空, streamsEnded设置,我们发出end事件。 如果有东西在队列中,我们恢复它,听它的事件。

*请注意, pauseresume的咨询,所以一些流可能无法正常运行,并且需要缓冲。 这个练习留给读者。

做完这一切,我会做n=2的构建情况EventEmitter ,创造了ConcatStream它,并释放两种stream事件之后的end事件。 我敢肯定,它可以更简明地进行,但我们不妨用我们所拥有。



Answer 5:

https://github.com/joepie91/node-combined-stream2是一个下拉在用于联合流模块Streams2兼容的替代(其如上所述。)它自动换Streams1流。

用于联合流2实施例的代码:

var CombinedStream = require('combined-stream2');
var fs = require('fs');

var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

combinedStream.pipe(fs.createWriteStream('combined.txt'));


Answer 6:

streamee.js是一组基于节点1.0+流的流变压器和作曲家和包括串连方法:

var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);


文章来源: Concatenate two (or n) streams