2流:
鉴于可读流
stream1
和stream2
,什么是地道(简洁)的方式来获得含流stream1
和stream2
级联 ?我不能做
stream1.pipe(outStream); stream2.pipe(outStream)
stream1.pipe(outStream); stream2.pipe(outStream)
因为这时的流内容被混在一起。N个流:
给定一个EventEmitter发射流数量不确定的,如
eventEmitter.emit('stream', stream1) eventEmitter.emit('stream', stream2) eventEmitter.emit('stream', stream3) ... eventEmitter.emit('end')
什么是一个地道的(简洁)的方式来获得与连接在一起的所有流的流 ?
Answer 1:
该合并的流包会连接流。 从自述例如:
var CombinedStream = require('combined-stream');
var fs = require('fs');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
combinedStream.pipe(fs.createWriteStream('combined.txt'));
我相信你有所有的流一次追加。 如果队列中运行,空, combinedStream
自动结束。 参见问题#5 。
该流流库是一个具有明确的替代.end
,但它更受欢迎,想必没有得到很好的测试。 它使用节点0.10的streams2 API(见讨论 )。
Answer 2:
这可以用香草来的NodeJS完成
import { PassThrough } from 'stream'
const merge = (...streams) => {
let pass = new PassThrough()
let waiting = streams.length
for (let stream of streams) {
pass = stream.pipe(pass, {end: false})
stream.once('end', () => --waiting === 0 && pass.emit('end'))
}
return pass
}
Answer 3:
一个简单的减少操作要精细的NodeJS !
const {PassThrough} = require('stream')
let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => {
s.pipe(pt, {end: false})
s.once('end', () => a.every(s => s.ended) && pt.emit('end'))
return pt
}, new PassThrough())
干杯;)
Answer 4:
你也许可以使它更加简洁,但这里有一个工程:
var util = require('util');
var EventEmitter = require('events').EventEmitter;
function ConcatStream(streamStream) {
EventEmitter.call(this);
var isStreaming = false,
streamsEnded = false,
that = this;
var streams = [];
streamStream.on('stream', function(stream){
stream.pause();
streams.push(stream);
ensureState();
});
streamStream.on('end', function() {
streamsEnded = true;
ensureState();
});
var ensureState = function() {
if(isStreaming) return;
if(streams.length == 0) {
if(streamsEnded)
that.emit('end');
return;
}
isStreaming = true;
streams[0].on('data', onData);
streams[0].on('end', onEnd);
streams[0].resume();
};
var onData = function(data) {
that.emit('data', data);
};
var onEnd = function() {
isStreaming = false;
streams[0].removeAllListeners('data');
streams[0].removeAllListeners('end');
streams.shift();
ensureState();
};
}
util.inherits(ConcatStream, EventEmitter);
我们保持与轨道状态的streams
(流的队列; push
到后面,并shift
从前面), isStreaming
和streamsEnded
。 当我们得到一个新的数据流,我们按下时,当流结束后,我们停止听和移动它。 当流的流结束,我们设置streamsEnded
。
在这些事件中,我们检查我们的状态。如果我们已经流(管道流),我们什么也不做。 如果队列为空, streamsEnded
设置,我们发出end
事件。 如果有东西在队列中,我们恢复它,听它的事件。
*请注意, pause
和resume
的咨询,所以一些流可能无法正常运行,并且需要缓冲。 这个练习留给读者。
做完这一切,我会做n=2
的构建情况EventEmitter
,创造了ConcatStream
它,并释放两种stream
事件之后的end
事件。 我敢肯定,它可以更简明地进行,但我们不妨用我们所拥有。
Answer 5:
https://github.com/joepie91/node-combined-stream2是一个下拉在用于联合流模块Streams2兼容的替代(其如上所述。)它自动换Streams1流。
用于联合流2实施例的代码:
var CombinedStream = require('combined-stream2');
var fs = require('fs');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
combinedStream.pipe(fs.createWriteStream('combined.txt'));
Answer 6:
streamee.js是一组基于节点1.0+流的流变压器和作曲家和包括串连方法:
var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);