从事件处理程序向WriteStream写入数据

3
我有一个对象EventEmitter,我已经设置成监听事件。当事件被触发时,我想要将信息写入文件。我通过fs.createWriteStream(path, { flags: 'a' }) 打开了一个FileStream。目前我的问题在于,如果我非常频繁地触发事件,我就会开始“积压”。也就是说,.write返回false,要求我暂停写入一段时间。由于我是在事件处理程序中进行写入的,所以附近没有回调函数可用来指示写入过程的结束。从处理或触发方面,我该如何防止积压呢?
最终,似乎并不重要;所有数据都被写入了文件。但是我想尽可能遵循“规则”。
我知道我可以监听drain事件,在此之后重新开始写入,但是我如何防止其他事件进入处理程序?我注意到,如果我在每次触发之前都有50ms的延迟,积压似乎就不会发生,但这似乎有点像黑客行为。而且,如果您的硬盘速度较慢怎么办?
下面是我的情况示例:
var ee = new EventEmitter();
var stream = fs.createWriteStream('./file/log.txt', { flags:'a'} );

ee.on('report', function (i) {
  stream.write('new file data ' + i + ' --- '  + Date.now + '\n');
});

for (var i = 0; i < 10000; ++i) {
  ee.emit('report', i)
}

以下并非完整代码,但是这是它的主要内容。完整的代码会在运行HTTP服务器时发送响应,但是如果我通过for循环等方式排队了1000个请求,就会出现上述情况。

2个回答

1

实际上,我最终使用读写流找到了一个更简单的解决方案来解决这个问题。请参考下面的代码示例。

var stream = require('stream');
var fs = require('fs');
var EventEmitter = require('events').EventEmitter;

var ee = new EventEmitter();
var writeStream = fs.createWriteStream('./file/log.txt', { flags: 'a', end: false } );
var readStream = new stream.Readable();
// This needs to be here for compatibility reasons, but is intentionally a no-op
readStream._read = function() {};

ee.on('report', function (i) {
  readStream.push(i.toString());
});

readStream.pipe(writeStream);

for (var i = 0; i < 10000; ++i) {
  ee.emit('report', i);
}

这将使Node的管道和流系统与操作系统协调处理背压。在我看来,这是解决该问题的首选方法。

你应该将变量重命名以与函数调用相匹配。例如,readStream 实际上应该是 writeStream,因为这就是你正在创建的,而且你正在向可读流中推送数据,而不是写入流,并且接着从可读流到可写流进行传输。如果你修复了它,我会取消我的反对票并替换为支持票。 - ciso
我不认为这需要一个负评,但代码确实存在问题,所以我修复了它。 - arb
已点赞。您的命名方式令人困惑和误导。现在看起来好多了。谢谢。 - ciso

0

处理这个问题的理想方式是使用pause()暂停传入事件,如果事件来自流或以某种方式可暂停,则可以这样做,但这并非总是可能的。

如果无法以某种方式暂停传入事件,则我通常使用queue函数来处理,该函数属于async模块。当然,还有很多其他方法可以解决这个问题,但使用队列是我发现最简单的方法,而async模块(适用于许多异步操作)提供了一个很好的解决方案。

基本思路是将所有的write调用放入一个队列中,该队列配置为一次只处理1个任务。如果从您的stream.write调用返回false,则暂停queue。一旦从您的stream接收到drain事件,就再次resume()队列。这样,您就不会在流饱和时写入stream,但仍然可以接收事件并将其排队等待stream准备好它们。
使用示例代码进行操作如下:
var async = require('async');

var ee = new EventEmitter();
var stream = fs.createWriteStream('./file/log.txt', { flags:'a'} );

// Create a queue with a concurrency of 1
var writeQueue = async.queue(function(data, callback) {
    if (!stream.write(data)) {
        // if write() returns false, it's saturated; pause the queue
        writeQueue.pause();
    }
    callback();
}, 1); // <-- concurrency argument here; it's easy to miss ;)

stream.on('drain', function() {
    // the stream isn't saturated anymore; resume the queue
    writeQueue.resume();
})

ee.on('report', function (i) {
    // instead of writing directly to the stream, push data to the writeQueue
    writeQueue.push('new file data ' + i + ' --- '  + Date.now() + '\n');
});

for (var i = 0; i < 10000; ++i) {
  ee.emit('report', i)
}

注意: 这与让流在内部缓冲数据并没有太大区别。你仍然在缓冲数据,只不过是自己来控制情况。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接