How to compose transform Streams in node.js

2020-08-12 09:02发布

问题:

I have a csv parser implemented as a series of transform streams:

process.stdin
    .pipe(iconv.decodeStream('win1252'))
    .pipe(csv.parse())
    .pipe(buildObject())
    .pipe(process.stdout);

I'd like to abstract the parser (in its own module) and be able to do:

process.stdin.
    .pipe(parser)
    .pipe(process.stdout);

where parser is just the composition of the previously used transform streams.

If I do

var parser = iconv.decodeStream('win1252')
    .pipe(csv.parse())
    .pipe(buildObject());

then parser is set to the buildObject() stream and only this transformation stream receives the data.

If I do

var parser = iconv.decodeStream('win1252');
parser
    .pipe(csv.parse())
    .pipe(buildObject());

it doesn't work either, as .pipe(process.stdout) will be called on the 1st transform stream and the 2 others will be bypassed.

Any recommendation for an elegant composition of streams?

回答1:

Unfortunately, there is no built-in way to do that, but there is cool multipipe package. Use like this:

var multipipe = require('multipipe');

var parser = multipipe(iconv.decodeStream('win1252'), csv.parse(), buildObject());


回答2:

I've been struggling with this issue (and some others!). I found highlandjs solved nearly all my problems. In this case their pipeline command did the trick:

var h = require('highland');
var parser = h.pipeline(iconv.decodeStream('win1252'), csv.parse(), buildObject());


回答3:

I think this can be done natively now.

const { PassThrough, Transform } = require('stream');

const compose = (...streams) => {
  const first = new PassThrough();
  const last = new PassThrough();
  const result = new Transform();

  [first, ...streams, last].reduce(
    (chain, stream) => (
      stream.on('error', (error) => result.emit('error', error)),
      chain.pipe(stream)
    ),
  );

  result._transform = (chunk, enc, cb) => {
    last.once('data', (chunk) => cb(null, chunk));
    first.push(chunk, enc);
  };

  result._flush = (cb) => {
    last.once('end', () => cb(null));
    first.push(null);
  };

  return result;
};