可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
Intro
These are my first adventures in writing node.js server side. It's been
fun so far but I'm having some difficulty understanding the proper way
to implement something as it relates to node.js streams.
Problem
For test and learning purposes I'm working with large files whose
content is zlib compressed. The compressed content is binary data, each
packet being 38 bytes in length. I'm trying to create a resulting file
that looks almost identical to the original file except that there is an
uncompressed 31 byte header for every 1024 38 byte packets.
original file content (decompressed)
+----------+----------+----------+----------+
| packet 1 | packet 2 | ...... | packet N |
| 38 bytes | 38 bytes | ...... | 38 bytes |
+----------+----------+----------+----------+
resulting file content
+----------+--------------------------------+----------+--------------------------------+
| header 1 | 1024 38 byte packets | header 2 | 1024 38 byte packets |
| 31 bytes | zlib compressed | 31 bytes | zlib compressed |
+----------+--------------------------------+----------+--------------------------------+
As you can see, it's somewhat of a translation problem. Meaning, I'm
taking some source stream as input and then slightly transforming it
into some output stream. Therefore, it felt natural to implement a
Transform stream.
The class simply attempts to accomplish the following:
- Takes stream as input
- zlib inflates the chunks of data to count the number of packets,
putting together 1024 of them, zlib deflating, and
prepending a header.
- Passes the new resulting chunk on through the pipeline via
this.push(chunk)
.
A use case would be something like:
var fs = require('fs');
var me = require('./me'); // Where my Transform stream code sits
var inp = fs.createReadStream('depth_1000000');
var out = fs.createWriteStream('depth_1000000.out');
inp.pipe(me.createMyTranslate()).pipe(out);
Question(s)
Assuming Transform is a good choice for this use case, I seem to be
running into a possible back-pressure issue. My call to this.push(chunk)
within _transform
keeps returning false
. Why would this be and how
to handle such things?
回答1:
I think Transform
is suitable for this, but I would perform the inflate as a separate step in the pipeline.
Here's a quick and largely untested example:
var zlib = require('zlib');
var stream = require('stream');
var transformer = new stream.Transform();
// Properties used to keep internal state of transformer.
transformer._buffers = [];
transformer._inputSize = 0;
transformer._targetSize = 1024 * 38;
// Dump one 'output packet'
transformer._dump = function(done) {
// concatenate buffers and convert to binary string
var buffer = Buffer.concat(this._buffers).toString('binary');
// Take first 1024 packets.
var packetBuffer = buffer.substring(0, this._targetSize);
// Keep the rest and reset counter.
this._buffers = [ new Buffer(buffer.substring(this._targetSize)) ];
this._inputSize = this._buffers[0].length;
// output header
this.push('HELLO WORLD');
// output compressed packet buffer
zlib.deflate(packetBuffer, function(err, compressed) {
// TODO: handle `err`
this.push(compressed);
if (done) {
done();
}
}.bind(this));
};
// Main transformer logic: buffer chunks and dump them once the
// target size has been met.
transformer._transform = function(chunk, encoding, done) {
this._buffers.push(chunk);
this._inputSize += chunk.length;
if (this._inputSize >= this._targetSize) {
this._dump(done);
} else {
done();
}
};
// Flush any remaining buffers.
transformer._flush = function() {
this._dump();
};
// Example:
var fs = require('fs');
fs.createReadStream('depth_1000000')
.pipe(zlib.createInflate())
.pipe(transformer)
.pipe(fs.createWriteStream('depth_1000000.out'));
回答2:
push
will return false if the stream you are writing to (in this case, a file output stream) has too much data buffered. Since you're writing to disk, this makes sense: you are processing data faster than you can write it out.
When out
's buffer is full, your transform stream will fail to push, and start buffering data itself. If that buffer should fill, then inp
's will start to fill. This is how things should be working. The piped streams are only going to process data as fast as the slowest link in the chain can handle it (once your buffers are full).
回答3:
This question from 2013 is all I was able to find on how to deal with "back pressure"
when creating node Transform streams.
From the node 7.10.0 Transform stream and Readable stream documentation what I gathered
was that once push
returned false, nothing else should be pushed until _read
was
called.
The Transform documentation doesn't mention _read
except to mention that the base Transform
class implements it (and _write). I found the information about push
returning false
and _read
being called in the Readable stream documentation.
The only other authoritative comment I found on Transform back pressure only mentioned
it as an issue, and that was in a comment at the top of the node file _stream_transform.js.
Here's the section about back pressure from that comment:
// This way, back-pressure is actually determined by the reading side,
// since _read has to be called to start processing a new chunk. However,
// a pathological inflate type of transform can cause excessive buffering
// here. For example, imagine a stream where every byte of input is
// interpreted as an integer from 0-255, and then results in that many
// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in
// 1kb of data being output. In this case, you could write a very small
// amount of input, and end up with a very large amount of output. In
// such a pathological inflating mechanism, there'd be no way to tell
// the system to stop doing the transform. A single 4MB write could
// cause the system to run out of memory.
//
// However, even in such a pathological case, only a single written chunk
// would be consumed, and then the rest would wait (un-transformed) until
// the results of the previous transformed chunk were consumed.
Solution example
Here's the solution I pieced together to handle the back pressure in a Transform stream
which I'm pretty sure works. (I haven't written any real tests, which would require
writing a Writable stream to control the back pressure.)
This is a rudimentary Line transform which needs work as a line transform but does
demonstrate handling the "back pressure".
const stream = require('stream');
class LineTransform extends stream.Transform
{
constructor(options)
{
super(options);
this._lastLine = "";
this._continueTransform = null;
this._transforming = false;
this._debugTransformCallCount = 0;
}
_transform(chunk, encoding, callback)
{
if (encoding === "buffer")
return callback(new Error("Buffer chunks not supported"));
if (this._continueTransform !== null)
return callback(new Error("_transform called before previous transform has completed."));
// DEBUG: Uncomment for debugging help to see what's going on
//console.error(`${++this._debugTransformCallCount} _transform called:`);
// Guard (so we don't call _continueTransform from _read while it is being
// invoked from _transform)
this._transforming = true;
// Do our transforming (in this case splitting the big chunk into lines)
let lines = (this._lastLine + chunk).split(/\r\n|\n/);
this._lastLine = lines.pop();
// In order to respond to "back pressure" create a function
// that will push all of the lines stopping when push returns false,
// and then resume where it left off when called again, only calling
// the "callback" once all lines from this transform have been pushed.
// Resuming (until done) will be done by _read().
let nextLine = 0;
this._continueTransform = () =>
{
let backpressure = false;
while (nextLine < lines.length)
{
if (!this.push(lines[nextLine++] + "\n"))
{
// we've got more to push, but we got backpressure so it has to wait.
if (backpressure)
return;
backpressure = !this.push(lines[nextLine++] + "\n");
}
}
// DEBUG: Uncomment for debugging help to see what's going on
//console.error(`_continueTransform ${this._debugTransformCallCount} finished\n`);
// All lines are pushed, remove this function from the LineTransform instance
this._continueTransform = null;
return callback();
};
// Start pushing the lines
this._continueTransform();
// Turn off guard allowing _read to continue the transform pushes if needed.
this._transforming = false;
}
_flush(callback)
{
if (this._lastLine.length > 0)
{
this.push(this._lastLine);
this._lastLine = "";
}
return callback();
}
_read(size)
{
// DEBUG: Uncomment for debugging help to see what's going on
//if (this._transforming)
// console.error(`_read called during _transform ${this._debugTransformCallCount}`);
// If a transform has not pushed every line yet, continue that transform
// otherwise just let the base class implementation do its thing.
if (!this._transforming && this._continueTransform !== null)
this._continueTransform();
else
super._read(size);
}
}
I tested the above by running it with the DEBUG lines uncommented on a ~10000 line
~200KB file. Redirect stdout or stderr to a file (or both) to separate the debugging
statements from the expected output. (node test.js > out.log 2> err.log
)
const fs = require('fs');
let inStrm = fs.createReadStream("testdata/largefile.txt", { encoding: "utf8" });
let lineStrm = new LineTransform({ encoding: "utf8", decodeStrings: false });
inStrm.pipe(lineStrm).pipe(process.stdout);
Helpful debugging hint
While writing this initially I didn't realize that _read
could be called before
_transform
returned, so I hadn't implemented the this._transforming
guard and I was
getting the following error:
Error: no writecb in Transform class
at afterTransform (_stream_transform.js:71:33)
at TransformState.afterTransform (_stream_transform.js:54:12)
at LineTransform._continueTransform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:44:13)
at LineTransform._transform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:46:21)
at LineTransform.Transform._read (_stream_transform.js:167:10)
at LineTransform._read (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:56:15)
at LineTransform.Transform._write (_stream_transform.js:155:12)
at doWrite (_stream_writable.js:331:12)
at writeOrBuffer (_stream_writable.js:317:5)
at LineTransform.Writable.write (_stream_writable.js:243:11)
Looking at the node implementation I realized that this error meant that the callback
given to _transform
was being called more than once. There wasn't much information
to be found about this error either so I thought I'd include what I figured out here.
回答4:
Ran into a similar problem lately, needing to handle backpressure in an inflating transform stream - the secret to handling push()
returning false is to register and handle the 'drain'
event on the stream
_transform(data, enc, callback) {
const continueTransforming = () => {
... do some work / parse the data, keep state of where we're at etc
if(!this.push(event))
this._readableState.pipes.once('drain', continueTransforming); // will get called again when the reader can consume more data
if(allDone)
callback();
}
continueTransforming()
}
NOTE this is a bit hacky as we're reaching into the internals and pipes
can even be an array of Readable
s but it does work in the common case of ....pipe(transform).pipe(...
Would be great if someone from the Node community can suggest a "correct" method for handling .push()
returning false
回答5:
I ended up following Ledion's example and created a utility Transform class which assists with backpressure. The utility adds an async method named addData, which the implementing Transform can await.
'use strict';
const { Transform } = require('stream');
/**
* The BackPressureTransform class adds a utility method addData which
* allows for pushing data to the Readable, while honoring back-pressure.
*/
class BackPressureTransform extends Transform {
constructor(...args) {
super(...args);
}
/**
* Asynchronously add a chunk of data to the output, honoring back-pressure.
*
* @param {String} data
* The chunk of data to add to the output.
*
* @returns {Promise<void>}
* A Promise resolving after the data has been added.
*/
async addData(data) {
// if .push() returns false, it means that the readable buffer is full
// when this occurs, we must wait for the internal readable to emit
// the 'drain' event, signalling the readable is ready for more data
if (!this.push(data)) {
await new Promise((resolve, reject) => {
const errorHandler = error => {
this.emit('error', error);
reject();
};
const boundErrorHandler = errorHandler.bind(this);
this._readableState.pipes.on('error', boundErrorHandler);
this._readableState.pipes.once('drain', () => {
this._readableState.pipes.removeListener('error', boundErrorHandler);
resolve();
});
});
}
}
}
module.exports = {
BackPressureTransform
};
Using this utility class, my Transforms look like this now:
'use strict';
const { BackPressureTransform } = require('./back-pressure-transform');
/**
* The Formatter class accepts the transformed row to be added to the output file.
* The class provides generic support for formatting the result file.
*/
class Formatter extends BackPressureTransform {
constructor() {
super({
encoding: 'utf8',
readableObjectMode: false,
writableObjectMode: true
});
this.anyObjectsWritten = false;
}
/**
* Called when the data pipeline is complete.
*
* @param {Function} callback
* The function which is called when final processing is complete.
*
* @returns {Promise<void>}
* A Promise resolving after the flush completes.
*/
async _flush(callback) {
// if any object is added, close the surrounding array
if (this.anyObjectsWritten) {
await this.addData('\n]');
}
callback(null);
}
/**
* Given the transformed row from the ETL, format it to the desired layout.
*
* @param {Object} sourceRow
* The transformed row from the ETL.
*
* @param {String} encoding
* Ignored in object mode.
*
* @param {Function} callback
* The callback function which is called when the formatting is complete.
*
* @returns {Promise<void>}
* A Promise resolving after the row is transformed.
*/
async _transform(sourceRow, encoding, callback) {
// before the first object is added, surround the data as an array
// between each object, add a comma separator
await this.addData(this.anyObjectsWritten ? ',\n' : '[\n');
// update state
this.anyObjectsWritten = true;
// add the object to the output
const parsed = JSON.stringify(sourceRow, null, 2).split('\n');
for (const [index, row] of parsed.entries()) {
// prepend the row with 2 additional spaces since we're inside a larger array
await this.addData(` ${row}`);
// add line breaks except for the last row
if (index < parsed.length - 1) {
await this.addData('\n');
}
}
callback(null);
}
}
module.exports = {
Formatter
};
回答6:
Mike Lippert's answer is the closest to the truth, I think. It appears that waiting for a new _read()
call to begin again from the reading stream is the only way that the Transform
is actively notified that the reader is ready. I wanted to share a simple example of how I override _read()
temporarily.
_transform(buf, enc, callback) {
// prepend any unused data from the prior chunk.
if (this.prev) {
buf = Buffer.concat([ this.prev, buf ]);
this.prev = null;
}
// will keep transforming until buf runs low on data.
if (buf.length < this.requiredData) {
this.prev = buf;
return callback();
}
var result = // do something with data...
var nextbuf = buf.slice(this.requiredData);
if (this.push(result)) {
// Continue transforming this chunk
this._transform(nextbuf, enc, callback);
}
else {
// Node is warning us to slow down (applying "backpressure")
// Temporarily override _read request to continue the transform
this._read = function() {
delete this._read;
this._transform(nextbuf, enc, callback);
};
}
}