TLDR;
pipe
存在这些问题
pipeline
被创建来解决所有这些问题,它做到了
pipeline
非常适合拥有从头到尾的所有部分,但如果没有:
详细回答:
接受的答案只是概述了 pipeline
,但它专门设计用于解决这个问题。 pipe
绝对会遇到这个问题(下文有更多细节),但我还没有遇到过 pipeline
无法正确关闭文件、http等流的情况。 对于随机的npm包,可能会有所不同,但如果它有一个close
或destroy
函数以及一个on('error'
事件,它应该没问题。
为了演示,这里调用shell命令来查看我们的测试文件是否打开:
const listOpenFiles = async () => {
const { stdout } = await promisify(exec)("lsof -c node | awk '{print $9}'");
const openFiles = stdout.split('\n').filter((str) => str.endsWith('case.txt'));
console.log('***** open files:\n', openFiles, '\n-------------');
};
如果你在上面的例子中在循环内调用它:
for await (const chunk of source) {
await listOpenFiles();
输出将会不断重复:
***** open files:
[
'/path/to/lowercase.txt',
'/path/to/uppercase.txt'
]
如果你在捕获异常后再次调用它,你会发现所有内容都已关闭。
***** open files:
[]
关于引用的文档:
在前两个要点中,pipeline
文档所提到的是它不会关闭已经关闭的流,因为它们已经关闭了。至于悬挂的监听器,这些确实留在传递给 pipeline
的各个流上。然而,在你的例子中(一个典型的情况),你并没有保留对各个流的引用;它们将在管道完成后立即被垃圾回收。这是在警告如果你有一个对其中一个流的常量引用可能会产生潜在副作用。
export const capitalizer = new Transform(
相反,最好拥有“干净”的实例。现在生成器函数很容易链接,甚至不必引用转换。但是,您可以简单地创建一个返回新实例而不是恒定实例的函数:
export const createCaptilizer = () => new Transform(
简而言之,上述示例在所有三个方面都很好。
pipe
的更多信息
另一方面,pipe
确实存在上述传播问题。
const csvStream = (file) => {
return fs.createReadStream(file).pipe(createCsvTransform());
};
普遍认为这样做很困难/不直观,但现在改变已经太晚了。我尽可能避免使用它,并建议在可能的情况下使用pipeline
。然而,需要注意的是,pipeline
需要所有部分都在一起。例如,在上面的例子中,你仍需要最终的Writable
目标。如果你只想构建链的一部分,仍必须使用pipe
。对此的解决方法更容易单独理解:
const csvStream = (file) => {
const fileStream = fs.createReadStream(file);
const transform = createCsvTransform();
fileStream.on('error', (error) => transform.emit('error', error));
transform.on('error', () => fileStream.close());
return transform;
}
然而,有好消息。虽然仍处于实验阶段,但很快 stream 将公开一个 stream.compose
函数。它具备 pipeline 的所有传播/清理优势,但只返回一个新的 stream。本质上,它就是大多数人认为 pipe
应该做的事情。 ;)
readable.pipe(transform);
stream.compose(readable, transform);
在那之前,可以看看 https://www.npmjs.com/package/stream-chain
有关pipeline
和await
的说明
请注意上面的示例使用了await pipeline(//...
,但链接文档是同步版本。它不会返回一个promise,所以await
没有任何效果。从 node 15 开始,通常应该使用 stream/promises
api: https://nodejs.org/api/stream.html#streams-promises-api
import { pipeline } from 'stream/promises';
在节点15之前,您可以使用util的promisify将其转换为Promise:
import { pipeline } from 'stream';
import { promisify } from 'util';
await promisify(pipeline)(
或者,为了简化整个文件:
import * as stream from 'stream';
import { promisify } from 'util';
const pipeline = promisify(stream.pipeline);
我之所以提到这点,是因为如果您在同步版本中使用await
,它实际上不会在try/catch
之后完成,因此可能会给人错误的印象,认为它未能清理,而事实上,它尚未完成。
pipeline()
的原因。我想我只需要在自己能够完全控制错误处理的地方包装自己的承诺即可。我是那种宁愿自己编写代码,也不愿意与预先制作但文档不良的代码搏斗的人。你知道的魔鬼和控制权对比于你不理解如何正确使用的预制代码。 - jfriend00