I have a csv parser implemented as a series of transform streams:
process.stdin
.pipe(iconv.decodeStream('win1252'))
.pipe(csv.parse())
.pipe(buildObject())
.pipe(process.stdout);
I'd like to abstract the parser (in its own module) and be able to do:
process.stdin.
.pipe(parser)
.pipe(process.stdout);
where parser
is just the composition of the previously used transform streams.
If I do
var parser = iconv.decodeStream('win1252')
.pipe(csv.parse())
.pipe(buildObject());
then parser
is set to the buildObject() stream and only this transformation stream receives the data.
If I do
var parser = iconv.decodeStream('win1252');
parser
.pipe(csv.parse())
.pipe(buildObject());
it doesn't work either, as .pipe(process.stdout)
will be called on the 1st transform stream and the 2 others will be bypassed.
Any recommendation for an elegant composition of streams?
Unfortunately, there is no built-in way to do that, but there is cool multipipe package.
Use like this:
var multipipe = require('multipipe');
var parser = multipipe(iconv.decodeStream('win1252'), csv.parse(), buildObject());
I've been struggling with this issue (and some others!). I found highlandjs solved nearly all my problems. In this case their pipeline command did the trick:
var h = require('highland');
var parser = h.pipeline(iconv.decodeStream('win1252'), csv.parse(), buildObject());
I think this can be done natively now.
const { PassThrough, Transform } = require('stream');
const compose = (...streams) => {
const first = new PassThrough();
const last = new PassThrough();
const result = new Transform();
[first, ...streams, last].reduce(
(chain, stream) => (
stream.on('error', (error) => result.emit('error', error)),
chain.pipe(stream)
),
);
result._transform = (chunk, enc, cb) => {
last.once('data', (chunk) => cb(null, chunk));
first.push(chunk, enc);
};
result._flush = (cb) => {
last.once('end', () => cb(null));
first.push(null);
};
return result;
};