NodeJS - Seeing data events from readable stream w

2019-03-31 11:56发布

We are seeing some extremely high memory usage with some of our streams in production. The files are stored in S3 and we open a readable stream on the S3 object, and then we pipe that data to a file on our local filesystem (on our EC2 instance). Some of our clients have extremely large files. In one instance they had a file that was over 6GB in size, and the node process that was handling this file was using so much memory that we exhausted almost all of our swap space and the machine slowed down to a crawl. Obviously, there is some memory leak somewhere which is what I'm trying to track down.

In the meantime, I augmented our code a bit to log when we see certain events from the streams. I have the code below and some sample output from the log with a small test file. What perplexes me is the fact that the readable stream receives a pause event and then continues to emit data and pause events WITHOUT the writable stream emitting a drain event. Am I completely missing something here? Once the readable stream is paused, how does it continue to emit data events prior to receiving a drain? The writable stream hasn't indicated that it is ready yet, so the readable stream should not be sending anything...right?

Yet look at the output. The first 3 events make sense to me: data, pause, drain. Then the next 3 are fine: data, data, pause. But THEN it emits another data and another pause event before finally getting a drain as the 9th event. I don't understand why events 7 and 8 occurred since the drain doesn't happen until the 9th event. Then again after the 9th event there are a bunch of data/pause pairs without any corresponding drain. Why? What I would expect is some number of data events, then a pause, and then NOTHING until a drain event occurs -- at which point data events could occur again. It seems to me that once a pause has occurred, no data events should occur at all until a drain event fires. Maybe I still fundamentally misunderstand something about Node streams?

UPDATE: The docs do not mention anything about a pause event being emitted by readable streams, but they do mention that a pause function is available. Presumably this gets called when the writable stream returns false, and I would assume the pause function would emit the pause event. In any case, if pause() is invoked, the docs seem to jive with my view of the world. See https://nodejs.org/docs/v0.10.30/api/stream.html#stream_class_stream_readable

This method will cause a stream in flowing-mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.

This test was run on my development machine (Ubuntu 14.04 with Node v0.10.37). Our EC2 instances in prod are almost the same. I think they run v0.10.30 right now.

S3Service.prototype.getFile = function(bucket, key, fileName) {
  var deferred = Q.defer(),
    self = this,
    s3 = self.newS3(),
    fstream = fs.createWriteStream(fileName),
    shortname = _.last(fileName.split('/'));

  logger.debug('Get file from S3 at [%s] and write to [%s]', key, fileName);

  // create a readable stream that will retrieve the file from S3
  var request = s3.getObject({
    Bucket: bucket,
    Key: key
  }).createReadStream();

  // if network request errors out then we need to reject
  request.on('error', function(err) {
      logger.error(err, 'Error encountered on S3 network request');
      deferred.reject(err);
    })
    .on('data', function() {
      logger.info('data event from readable stream for [%s]', shortname);
    })
    .on('pause', function() {
      logger.info('pause event from readable stream for [%s]', shortname);
    });

  // resolve when our writable stream closes, or reject if we get some error
  fstream.on('close', function() {
      logger.info('close event from writable stream for [%s] -- done writing file', shortname);
      deferred.resolve();
    })
    .on('error', function(err) {
      logger.error(err, 'Error encountered writing stream to [%s]', fileName);
      deferred.reject(err);
    })
    .on('drain', function() {
      logger.info('drain event from writable stream for [%s]', shortname);
    });

  // pipe the S3 request stream into a writable file stream
  request.pipe(fstream);

  return deferred.promise;
};

[2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.507Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.514Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.595Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.598Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.601Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.603Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.628Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.628Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.688Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.690Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.690Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.691Z] INFO: worker/7525 on bdmlinux: close event from writable stream for [FeedItem.csv] -- done writing file

1条回答
对你真心纯属浪费
2楼-- · 2019-03-31 12:45

It's possible you have some quantum-like "observing the phenomenon changes the outcome" situation here. Node introduced a new way of streaming in v0.10. From the docs:

If you attach a data event listener, then it will switch the stream into flowing mode, and data will be passed to your handler as soon as it is available.

Namely, attaching a data listener will revert the stream to the classic stream mode. This may be why you're getting behavior that's inconsistent with what you read in the rest of the docs. To observe things unintrusively, you could try removing your on('data') and insert your own stream in between using through like this:

var through = require('through');

var observer = through(function write(data) {
    console.log('Data!');
    this.queue(data);
}, function end() {
    this.queue(null);
});

request.pipe(observer).pipe(fstream);
查看更多
登录 后发表回答