NodeJS - 在没有对应的可写流暂停情况下查看可读流数据事件

3
我们在生产环境中的一些流中看到了极高的内存使用率。文件存储在S3中,我们在S3对象上打开一个可读流,然后将数据传输到本地文件系统上的文件(在我们的EC2实例上)。有些客户的文件非常大,在一个例子中,他们有一个超过6GB大小的文件,处理此文件的node进程使用了大量的内存,我们几乎用尽了所有的交换空间,机器变得非常缓慢。显然,某个地方存在内存泄漏,这就是我试图追踪的问题。
同时,我稍微修改了我们的代码,以记录我们从流中看到的某些事件。下面是我的代码和一些带有小型测试文件的日志示例输出。让我困惑的是,可读流接收到暂停事件,然后继续发出数据和暂停事件WITHOUT可写流发出drain事件。我完全错过了什么吗?一旦可读流被暂停,它如何在接收到drain之前继续发出数据事件呢?可写流还没有表明它已经准备好了,所以可读流不应该发送任何东西...对吧?
但是请看输出。前三个事件对我来说很有意义:数据、暂停、drain。然后下一个3个事件很好:数据、数据、暂停。但是然后它再次发出另一个数据和另一个暂停事件,最终在第9个事件之前获得drain。我不明白为什么会发生第7和第8个事件,因为直到第9个事件才会发生drain。然后在第9个事件之后,有一堆数据/暂停对,没有任何相应的drain。为什么?我期望的是一些数据事件,然后是暂停,然后NOTHING,直到发生drain事件--此时数据事件可以再次发生。在我看来,一旦发生暂停,就不应该再发生任何数据事件,直到触发drain事件。也许我仍然基本上误解了Node流的一些东西?
更新:文档没有提到可读流会发出暂停事件,但是他们提到了pause函数可用。假设当可写流返回false时调用此函数,并且我认为暂停函数将发出暂停事件。无论如何,如果调用pause(),文档似乎与我的世界观相吻合。请参见https://nodejs.org/docs/v0.10.30/api/stream.html#stream_class_stream_readable

此方法将导致流在流动模式下停止发出数据事件。任何可用的数据将保留在内部缓冲区中。

这个测试是在我的开发机器上运行的(Ubuntu 14.04,带有Node v0.10.37)。我们在生产环境中的EC2实例几乎相同。我认为他们现在运行v0.10.30。
S3Service.prototype.getFile = function(bucket, key, fileName) {
  var deferred = Q.defer(),
    self = this,
    s3 = self.newS3(),
    fstream = fs.createWriteStream(fileName),
    shortname = _.last(fileName.split('/'));

  logger.debug('Get file from S3 at [%s] and write to [%s]', key, fileName);

  // create a readable stream that will retrieve the file from S3
  var request = s3.getObject({
    Bucket: bucket,
    Key: key
  }).createReadStream();

  // if network request errors out then we need to reject
  request.on('error', function(err) {
      logger.error(err, 'Error encountered on S3 network request');
      deferred.reject(err);
    })
    .on('data', function() {
      logger.info('data event from readable stream for [%s]', shortname);
    })
    .on('pause', function() {
      logger.info('pause event from readable stream for [%s]', shortname);
    });

  // resolve when our writable stream closes, or reject if we get some error
  fstream.on('close', function() {
      logger.info('close event from writable stream for [%s] -- done writing file', shortname);
      deferred.resolve();
    })
    .on('error', function(err) {
      logger.error(err, 'Error encountered writing stream to [%s]', fileName);
      deferred.reject(err);
    })
    .on('drain', function() {
      logger.info('drain event from writable stream for [%s]', shortname);
    });

  // pipe the S3 request stream into a writable file stream
  request.pipe(fstream);

  return deferred.promise;
};

[2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.507Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.514Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.595Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.598Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.601Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.603Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [Feed

1个回答

1
可能存在一些类似于“观察现象会改变结果”的量子情景。节点 introduced 在v0.10中引入了一种新的流式传输方式。从docs中得知:
如果你附加一个数据事件监听器,它将切换流到流动模式,数据将尽快传递到处理程序。
即,附加一个数据监听器将恢复流到经典的流模式。这可能是为什么你得到的行为与文档的其余部分不一致的原因。为了无干扰地观察事物,你可以尝试删除on('data')并使用through在之间插入自己的流,像这样:
var through = require('through');

var observer = through(function write(data) {
    console.log('Data!');
    this.queue(data);
}, function end() {
    this.queue(null);
});

request.pipe(observer).pipe(fstream);

太棒了,我认为你做得很好。我曾经在某个地方读到过,附加数据事件会将流切换到“旧模式”,但我没有意识到,在v0.10之前,暂停事件只是建议性的而不是保证性的。这似乎是秘密酱料。所以我认为你是完全正确的。附加数据事件侦听器将流切换到“旧模式”以实现向后兼容。在旧模式下,pause()方法仅具有建议性。 - twofifty6

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接