如何关闭可读流(在结束之前)?

132

如何在 Node.js 中关闭一个可读流

var input = fs.createReadStream('lines.txt');

input.on('data', function(data) {
   // after closing the stream, this will not
   // be called again

   if (gotFirstLine) {
      // close this stream and continue the
      // instructions from this if
      console.log("Closed.");
   }
});

这将比以下内容更好:

input.on('data', function(data) {
   if (isEnded) { return; }

   if (gotFirstLine) {
      isEnded = true;
      console.log("Closed.");
   }
});

但这并不能阻止阅读过程...


7
警告:此问题仅涉及“fs”模块。在“Stream.Readable”中不存在“close”函数。 - zamnuts
4
好消息。Node版本8提供了stream.destroy()方法。 - joeytwiddle
你不能调用 readable.push(null) && readable.destroy(); 吗? - Alexander Mills
10个回答

118

编辑:好消息!从Node.js 8.0.0开始,readable.destroy已正式发布:https://nodejs.org/api/stream.html#stream_readable_destroy_error

ReadStream.destroy

您可以随时调用 ReadStream.destroy 函数。
var fs = require("fs");

var readStream = fs.createReadStream("lines.txt");
readStream
    .on("data", function (chunk) {
        console.log(chunk);
        readStream.destroy();
    })
    .on("end", function () {
        // This may not been called since we are destroying the stream
        // the first time "data" event is received
        console.log("All the data in the file has been read");
    })
    .on("close", function (err) {
        console.log("Stream has been destroyed and file has been closed");
    });

公共函数ReadStream.destroy没有文档记录(Node.js v0.12.2),但是你可以查看GitHub上的源代码2012年10月5日提交)。 destroy函数在内部将ReadStream实例标记为已销毁,并调用close函数释放文件。
你可以监听close事件来确定文件何时关闭。只有当数据完全消耗时,end事件才会触发。
请注意,destroy(以及close)函数仅适用于fs.ReadStream。它们不是通用的stream.readable“接口”的一部分。

至少在最新版本的Node中(还没有检查其他版本),文件描述符会自动关闭。话虽如此,我并没有进行彻底的测试来确保如果流从未被读取,则最终会触发error事件。除此之外,我唯一担心的是事件处理程序泄漏。再次强调,我不能百分之百确定,但根据2010年Isaacs的圣经所言,当发射器被垃圾回收时,处理程序会被修剪:https://groups.google.com/d/msg/nodejs/pXbJVo0NtaY/BxUmF_jp9LkJ - mikermcneil
1
如果数据太小,on('data') 只会触发一次,因此不会有 .close(),只需提醒其他人即可。 - bytefish
1
你其实可以使用 this.destroy(),除非你使用箭头函数。词法作用域中的 this,我恨你 :D - Moha the almighty camel
6
我正在使用 pipe(),我必须将 on("close") 移动到 .pipe()on("data") 之前,否则无法捕获 "close" 事件。 - Maxim Mazurok
@MaximMazurok 兄弟,你是救星啊!这问题我纠结了好几个小时了,谢谢! - Fazal Karim

46

调用 input.close()。虽然没有在文档中记录,但是https://github.com/joyent/node/blob/cfcb1de130867197cbc9c6012b7e84e08e53d032/lib/fs.js#L1597-L1620清楚地完成了这项工作 :) 它实际上执行与您的isEnded类似的操作。

编辑2015年4月19日:根据下面的评论澄清和更新:

  • 此建议是一种黑客行为,并未记录在案。
  • 尽管查看当前的lib/fs.js后,它仍然有效超过1.5年。
  • 我同意下面关于调用destroy()更可取的评论。
  • 如下所述,这适用于fs ReadStreams,而不适用于通用的Readable

至于通用解决方案:根据我的理解文档以及快速查看_stream_readable.js,似乎没有这样的解决方案。

我的建议是将可读流放置在暂停模式下,至少可以防止上游数据源中的进一步处理。不要忘记unpipe()并删除所有data事件侦听器,以使pause()实际上暂停,如文档所述。


1
实际上,我更喜欢调用 destroy。至少如果将 autoClose 设置为 true,就会这样调用。通过查看源代码(今天),差异很小(destroy 调用 close),但这在未来可能会改变。 - Marcelo Diniz
现在不记得了,但看起来是这样的 :) - Nitzan Shaked
4
对象 Readable 没有 close() 方法,是否有其他解决方案?我的数据交换总是不完整... - CodeManX
更新以澄清、回应评论,并提供一个(穷人版的)通用情况建议。虽然不强制要求通用的“可读”实现close(),并提供一种类特定的方法来实现这一点(就像在fs中一样,以及其他实现Readable的类中可能也是如此)。 - Nitzan Shaked
暂停不会导致上游(发送方)因背压而阻塞,或者导致缓冲区增长直到超过其限制吗?理想情况下,我们应该告诉发送方它不再需要发送数据。 - joeytwiddle

27

14

在版本4.*.*中,将 null 值推入流中将触发 EOF 信号。

来自 Node.js 文档

如果传递的是除 null 以外的值,则 push() 方法会向队列中添加一块数据,以供后续的流处理器消耗。如果传递的是 null,则表示流结束(EOF),此后不能再写入任何数据。

在此页面尝试了许多其他选项后,这对我有效。


1
对我来说可以工作。但是,我需要避免在推送null后调用done()回调以获得预期的行为 - 即整个流停止。 - Rich Apodaca

14
你不能关闭/中止/终止/销毁一个通用的Readable流,截至Node 5.3.0,没有文档记录的方法可以实现。这是Node流架构的局限性。
正如其他答案所解释的那样,对于Node提供的特定Readable实现,例如fs.ReadStream,有一些未记录的黑科技方法。然而,这些不是针对任何Readable的通用解决方案。
如果有人能证明我错了,请务必告诉我。我想做一些我认为不可能做到的事情,并会很高兴接受纠正。
编辑:这是我的解决方法:通过一系列复杂的unpipe()调用为我的管道实现.destroy() 而在所有这些复杂性之后,它并不在所有情况下都有效
编辑:Node v8.0.0添加了一个destroy() api用于Readable流

1
现在有 stream.pipeline,它声称可以处理“转发错误并正确清理并在管道完成时提供回调”。这有帮助吗? - andrewdotn

7

这个destroy模块的作用是确保流被销毁,处理不同的API和Node.js的错误。现在它是最好的选择之一。

NB. 从Node 10开始,您可以使用.destroy方法而无需进一步依赖。


3

这是一个老问题,但我也在寻找答案,并为我的实现找到了最好的答案。因为endclose事件都会被触发,所以我认为这是最干净的解决方案。

在编写时稳定版本的node 4.4.*中,以下代码将解决问题:

var input = fs.createReadStream('lines.txt');

input.on('data', function(data) {
   if (gotFirstLine) {
      this.end(); // Simple isn't it?
      console.log("Closed.");
   }
});

如需详细解释,请参见:http://www.bennadel.com/blog/2692-you-have-to-explicitly-end-streams-after-pipes-break-in-node-js.htm

关于Node.js中管道断开后必须显式结束流的详细说明,请参考上述链接。

3
你可以使用yourstream.resume()来清空并关闭流,这将倒出 (dump) 流上的所有内容并最终关闭它。
根据官方文档:
readable.resume(): 返回值: this 此方法将使可读流恢复发出 'data' 事件。 此方法将把流切换到流动模式。如果你不想使用流中的数据,但是想获得其“end”事件,你可以调用 stream.resume() 来打开数据流。
var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', () => {
  console.log('got to the end, but did not read anything');
});

这可以被称为“排空”流。在我们的情况下,当然我们有一个'data'事件监听器,但我们让它检查一个布尔值if (!ignoring) { ... },所以当我们排空流时它不会处理数据。ignoring = true; readable.resume(); - joeytwiddle
5
当然,这假设流最终会“结束”。并不是所有的流都是这样!(例如,每秒发送日期的永久流。) - joeytwiddle

2
这里的代码可以很好地完成任务:

function closeReadStream(stream) {
    if (!stream) return;
    if (stream.close) stream.close();
    else if (stream.destroy) stream.destroy();
}

writeStream.end() 是关闭 writeStream 的常用方法...


2
为什么你提到 .end() 是最常用的方法,但是你的代码却使用了 close 和 destroy,甚至没有使用 end? - Lucas B
1
我正在关闭一个readStream的示例...一个writeStream -- 使用.end - g00dnatur3

0

如果要在某个调用后停止回调执行,您需要使用带有特定进程ID的 process.kill。

const csv = require('csv-parser');
const fs = require('fs');

const filepath = "./demo.csv"
let readStream = fs.createReadStream(filepath, {
    autoClose: true,
});
let MAX_LINE = 0;


readStream.on('error', (e) => {
        console.log(e);
        console.log("error");
    })

    .pipe(csv())
    .on('data', (row) => {

        if (MAX_LINE == 2) {
            process.kill(process.pid, 'SIGTERM')
        }
        // console.log("not 2");
        MAX_LINE++
        console.log(row);
    })

    .on('end', () => {
        // handle end of CSV
        console.log("read done");
    }).on("close", function () {
        console.log("closed");
    })


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接