同时,我稍微修改了我们的代码,以记录我们从流中看到的某些事件。下面是我的代码和一些带有小型测试文件的日志示例输出。让我困惑的是,可读流接收到暂停事件,然后继续发出数据和暂停事件WITHOUT可写流发出drain事件。我完全错过了什么吗?一旦可读流被暂停,它如何在接收到drain之前继续发出数据事件呢?可写流还没有表明它已经准备好了,所以可读流不应该发送任何东西...对吧?
但是请看输出。前三个事件对我来说很有意义:数据、暂停、drain。然后下一个3个事件很好:数据、数据、暂停。但是然后它再次发出另一个数据和另一个暂停事件,最终在第9个事件之前获得drain。我不明白为什么会发生第7和第8个事件,因为直到第9个事件才会发生drain。然后在第9个事件之后,有一堆数据/暂停对,没有任何相应的drain。为什么?我期望的是一些数据事件,然后是暂停,然后NOTHING,直到发生drain事件--此时数据事件可以再次发生。在我看来,一旦发生暂停,就不应该再发生任何数据事件,直到触发drain事件。也许我仍然基本上误解了Node流的一些东西?
更新:文档没有提到可读流会发出暂停事件,但是他们提到了pause函数可用。假设当可写流返回false时调用此函数,并且我认为暂停函数将发出暂停事件。无论如何,如果调用pause(),文档似乎与我的世界观相吻合。请参见https://nodejs.org/docs/v0.10.30/api/stream.html#stream_class_stream_readable
这个测试是在我的开发机器上运行的(Ubuntu 14.04,带有Node v0.10.37)。我们在生产环境中的EC2实例几乎相同。我认为他们现在运行v0.10.30。此方法将导致流在流动模式下停止发出数据事件。任何可用的数据将保留在内部缓冲区中。
S3Service.prototype.getFile = function(bucket, key, fileName) {
var deferred = Q.defer(),
self = this,
s3 = self.newS3(),
fstream = fs.createWriteStream(fileName),
shortname = _.last(fileName.split('/'));
logger.debug('Get file from S3 at [%s] and write to [%s]', key, fileName);
// create a readable stream that will retrieve the file from S3
var request = s3.getObject({
Bucket: bucket,
Key: key
}).createReadStream();
// if network request errors out then we need to reject
request.on('error', function(err) {
logger.error(err, 'Error encountered on S3 network request');
deferred.reject(err);
})
.on('data', function() {
logger.info('data event from readable stream for [%s]', shortname);
})
.on('pause', function() {
logger.info('pause event from readable stream for [%s]', shortname);
});
// resolve when our writable stream closes, or reject if we get some error
fstream.on('close', function() {
logger.info('close event from writable stream for [%s] -- done writing file', shortname);
deferred.resolve();
})
.on('error', function(err) {
logger.error(err, 'Error encountered writing stream to [%s]', fileName);
deferred.reject(err);
})
.on('drain', function() {
logger.info('drain event from writable stream for [%s]', shortname);
});
// pipe the S3 request stream into a writable file stream
request.pipe(fstream);
return deferred.promise;
};
[2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv]
[2015-05-13T17:21:00.507Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.514Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv]
[2015-05-13T17:21:00.595Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.598Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv]
[2015-05-13T17:21:00.601Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.603Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv]
[2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [Feed