如何在Node.js中使用流(Writable)的drain事件

17

在Node.js中,我正在使用fs.createWriteStream方法将数据添加到本地文件中。在Node文档中,当使用fs.createWriteStream时,他们提到了drain事件,但我不理解它。

var stream = fs.createWriteStream('fileName.txt');
var result = stream.write(data);
在上面的代码中,我如何使用 drain 事件?下面的代码正确地使用了该事件吗?
var data = 'this is my data';
if (!streamExists) {
  var stream = fs.createWriteStream('fileName.txt');
}

var result = stream.write(data);
if (!result) {
  stream.once('drain', function() {
    stream.write(data);
  });
}
4个回答

38
drain事件是指可写流的内部缓冲区已被清空时触发的事件。
仅当内部缓冲区的大小超过其highWaterMark属性时,才会发生这种情况。该属性是可写流的内部缓冲区中可以存储的最大数据字节数,直到停止从数据源读取数据为止。
这种情况的原因可能是涉及从一个流中读取数据源的设置比将数据写入另一个资源的速度更快。例如,考虑两个流:
var fs = require('fs');

var read = fs.createReadStream('./read');
var write = fs.createWriteStream('./write');

现在假设文件read存储在SSD上,读取速度为500MB/s,而write存储在HDD上,只能以150MB/s的速度写入。写入流将跟不上,开始将数据存储在内部缓冲区中。一旦缓冲区达到默认值16KB的highWaterMark,写入将开始返回false,并且流将在内部排队等待。一旦内部缓冲区的长度为0,drain事件就会被触发。

这就是drain的工作原理:

if (state.length === 0 && state.needDrain) {
  state.needDrain = false;
  stream.emit('drain');
}

以下是 writeOrBuffer 函数中作为排水前提条件的内容:

var ret = state.length < state.highWaterMark;
state.needDrain = !ret;

为了了解如何使用 drain 事件,可以参考 Node.js 文档中的示例。

function writeOneMillionTimes(writer, data, encoding, callback) {
  var i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i -= 1;
      if (i === 0) {
        // last time!
        writer.write(data, encoding, callback);
      } else {
        // see if we should continue, or wait
        // don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // had to stop early!
      // write some more once it drains
      writer.once('drain', write);
    }
  }
}

该函数的目标是向可写流写入1,000,000次。当变量ok被设置为true时,循环才会执行。在每次循环迭代中,ok的值会被设置为stream.write()的返回值,如果需要drain,则该函数返回false。如果ok变成了false,那么等待drain的事件处理程序将会暂停,直到触发事件后恢复写入。
就你的代码而言,你不需要使用drain事件,因为你只是在打开流之后立即写入一次。由于你还没有向流中写入任何东西,内部缓冲区为空,除非以16KB的大小写入块,否则drain事件不会触发。drain事件适用于多次写入比你可写流的highWaterMark设置更多数据的情况。

我一遍又一遍地对同一个流进行写操作。比如说,我的代码可能像这样。我的数据量有时可能非常大。 - sachin
在你的示例中,你只写了一次。请展示一下你实际上是如何将东西写入流中的,因为只写一次字符串永远不需要 drain 事件。 - hexacyanide
在写入流之前,我会先检查该流是否存在。如果存在,则会在其上进行写入操作;否则,我将创建一个新的流。我已更新了我的问题。 - sachin
在这个例子中,你仍然只写了一次流。请展示如何多次向流中写入内容以建立可以使用 drain 的上下文。 - hexacyanide
@FreeLightman writer 是任何可写流的实例。可以从它必须具有 write 方法和 drain 事件来推断。 - hexacyanide
显示剩余2条评论

15

假设你正在连接两个带宽非常不同的流,例如将本地文件上传到一个速度较慢的服务器。快速的文件流会比慢速的套接字流发出数据更快。

在这种情况下,Node.js会将数据存储在内存中,直到慢流有机会处理它。如果文件非常大,这可能会导致问题。

为了避免这种情况,当底层系统缓冲区已满时,Stream.write会返回false。如果停止写入,则流稍后会发出drain事件,表示系统缓冲区已清空,可以再次写入。

你可以使用pause/resume暂停/恢复可读流并控制其带宽。

更好的方法是,你可以使用readable.pipe(writable)来自动处理此问题。

编辑:你的代码中有一个错误:无论write返回什么,你的数据都已经被写入。因此你不需要重试它。在你的情况下,你正在写入data两次。

像这样做可以解决问题:

var packets = […],
    current = -1;

function niceWrite() {
  current += 1;

  if (current === packets.length)
    return stream.end();

  var nextPacket = packets[current],
      canContinue = stream.write(nextPacket);

  // wait until stream drains to continue
  if (!canContinue)
    stream.once('drain', niceWrite);
  else
    niceWrite();
}

在 stream.once('drain', niceWrite); 中,我们可以用 on 替换 once 吗? - Rajat Aggarwal
如果数据包非常长,这个递归会导致堆栈溢出吗? - Marson Mao

5

这里是使用async/await版本

const write = (writer, data) => {
  return new Promise((resolve) => {
    if (!writer.write(data)) {
      writer.once('drain', resolve)
    }
    else {
      resolve()
    }
  })
}

// usage
const run = async () => {
  const write_stream = fs.createWriteStream('...')
  const max = 1000000
  let current = 0
  while (current <= max) {
    await write(write_stream, current++)
  }
}

https://gist.github.com/stevenkaspar/509f792cbf1194f9fb05e7d60a1fbc73


2
这是一个使用Promise(async/await)进行速度优化的版本。调用者必须检查是否得到了一个promise,只有在这种情况下才需要调用await。在每次调用上执行await可能会将程序减慢3倍...
const write = (writer, data) => {
    // return a promise only when we get a drain
    if (!writer.write(data)) {
        return new Promise((resolve) => {
            writer.once('drain', resolve)
        })
    }
}

// usage
const run = async () => {
    const write_stream = fs.createWriteStream('...')
    const max = 1000000
    let current = 0
    while (current <= max) {
        const promise = write(write_stream, current++)
        // since drain happens rarely, awaiting each write call is really slow.
        if (promise) {
            // we got a drain event, therefore we wait
            await promise
        }
    }
}

我遇到了这个错误:MaxListenersExceededWarning: 可能检测到 EventEmitter 内存泄漏。已添加 11 个 drain 监听器。使用 emitter.setMaxListeners() 来增加限制。我正在循环中调用 write。有什么建议可以修复吗? - VPaul
@VPaul,你的循环必须是异步的,并等待write返回的Promise。 - Steven Lu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接