Node - 在管道流结束后正确关闭流

6

假设我有以下代码:

try {
    let size = 0;

    await pipeline(
        fs.createReadStream('lowercase.txt'),
        async function* (source) {
            for await (const chunk of source) {
                size += chunk.length;
           
                if (size >= 1000000) {
                    throw new Error('File is too big');
                }

                yield String(chunk).toUpperCase();
            }
        },
        fs.createWriteStream('uppercase.txt')
    );

    console.log('Pipeline succeeded.');
} catch (error) {
    console.log('got error:', error);
}

如何确保在所有情况下正确关闭流? Node文档 并没有提供帮助 - 它们只告诉我,可能会有悬空的事件监听器:

stream.pipeline()将在除以下情况外所有流上调用 stream.destroy(err):

已触发 'end' 或 'close' 的可读流。

已触发 'finish' 或 'close' 的可写流。

在回调被调用后,stream.pipeline()会在流上留下悬空的事件监听器。在流失败后重复使用的情况下,这可能会导致事件监听器泄漏和错误被吞噬。


3
我也注意到文档在这个主题上非常不完整,这就是为什么我从来没有使用过pipeline()的原因。我想我只需要在自己能够完全控制错误处理的地方包装自己的承诺即可。我是那种宁愿自己编写代码,也不愿意与预先制作但文档不良的代码搏斗的人。你知道的魔鬼和控制权对比于你不理解如何正确使用的预制代码。 - jfriend00
@jfriend00 我理解你的意思。如果可以的话,你能否使用 promises 和 event listeners 写出上面代码的一个版本呢?我会很感激并接受它作为答案 :) - Infamous911
2个回答

10

TLDR;

  • pipe存在这些问题
  • pipeline被创建来解决所有这些问题,它做到了
  • pipeline非常适合拥有从头到尾的所有部分,但如果没有:

详细回答:

接受的答案只是概述了 pipeline,但它专门设计用于解决这个问题。 pipe 绝对会遇到这个问题(下文有更多细节),但我还没有遇到过 pipeline 无法正确关闭文件、http等流的情况。 对于随机的npm包,可能会有所不同,但如果它有一个closedestroy函数以及一个on('error'事件,它应该没问题。

为了演示,这里调用shell命令来查看我们的测试文件是否打开:

const listOpenFiles = async () => {
  const { stdout } = await promisify(exec)("lsof -c node | awk '{print $9}'");

  // only show our test files
  const openFiles = stdout.split('\n').filter((str) => str.endsWith('case.txt'));
  console.log('***** open files:\n', openFiles, '\n-------------');
};

如果你在上面的例子中在循环内调用它:
for await (const chunk of source) {
  await listOpenFiles();

输出将会不断重复:
***** open files:
[
  '/path/to/lowercase.txt',
  '/path/to/uppercase.txt'
]

如果你在捕获异常后再次调用它,你会发现所有内容都已关闭。

***** open files:
 [] 

关于引用的文档:

在前两个要点中,pipeline 文档所提到的是它不会关闭已经关闭的流,因为它们已经关闭了。至于悬挂的监听器,这些确实留在传递给 pipeline 的各个流上。然而,在你的例子中(一个典型的情况),你并没有保留对各个流的引用;它们将在管道完成后立即被垃圾回收。这是在警告如果你有一个对其中一个流的常量引用可能会产生潜在副作用。

// using this same instance over and over will end up with tons of dangling listeners
export const capitalizer = new Transform(// ...

相反,最好拥有“干净”的实例。现在生成器函数很容易链接,甚至不必引用转换。但是,您可以简单地创建一个返回新实例而不是恒定实例的函数:

export const createCaptilizer = () => new Transform(// ...

简而言之,上述示例在所有三个方面都很好。

pipe 的更多信息

另一方面,pipe确实存在上述传播问题。

const csvStream = (file) => {
  // does not expose file errors, nor clean up the file stream on parsing errors!!!
  return fs.createReadStream(file).pipe(createCsvTransform());
};

普遍认为这样做很困难/不直观,但现在改变已经太晚了。我尽可能避免使用它,并建议在可能的情况下使用pipeline。然而,需要注意的是,pipeline需要所有部分都在一起。例如,在上面的例子中,你仍需要最终的Writable目标。如果你只想构建链的一部分,仍必须使用pipe。对此的解决方法更容易单独理解:

const csvStream = (file) => {
  const fileStream = fs.createReadStream(file);
  const transform = createCsvTransform();
  // pass file errors forward
  fileStream.on('error', (error) => transform.emit('error', error));
  // close file stream on parsing errors
  transform.on('error', () => fileStream.close());

  return transform;
}

然而,有好消息。虽然仍处于实验阶段,但很快 stream 将公开一个 stream.compose 函数。它具备 pipeline 的所有传播/清理优势,但只返回一个新的 stream。本质上,它就是大多数人认为 pipe 应该做的事情。 ;)

// NO propagation or cleanup
readable.pipe(transform);

// automatic propagation and cleanup
stream.compose(readable, transform);

在那之前,可以看看 https://www.npmjs.com/package/stream-chain

有关pipelineawait的说明

请注意上面的示例使用了await pipeline(//...,但链接文档是同步版本。它不会返回一个promise,所以await没有任何效果。从 node 15 开始,通常应该使用 stream/promises api: https://nodejs.org/api/stream.html#streams-promises-api

import { pipeline } from 'stream/promises'; // NOT 'stream'

在节点15之前,您可以使用util的promisify将其转换为Promise:
import { pipeline } from 'stream';
import { promisify } from 'util';

await promisify(pipeline)(// ...

或者,为了简化整个文件:

import * as stream from 'stream';
import { promisify } from 'util';

const pipeline = promisify(stream.pipeline);

我之所以提到这点,是因为如果您在同步版本中使用await,它实际上不会在try/catch之后完成,因此可能会给人错误的印象,认为它未能清理,而事实上,它尚未完成。


8

因此,我发现许多node.js流组合操作(例如pipeline().pipe())在错误处理方面非常糟糕/不完整。例如,如果您只是这样做:

fs.createReadStream("input.txt")
  .pipe(fs.createWriteStream("output.txt"))
  .on('error', err => {
      console.log(err);
  }).on('finish', () => {
      console.log("all done");
  });

你会期望如果打开 readStream 出现错误,你会在这里的错误处理程序中看到这个错误,但是实际情况并非如此。对于输入文件的打开错误将不被处理。有一定的逻辑在于 .pipe() 返回了输出流,而输入错误并不是输出流上的错误,但当它没有被传递时,很容易忽略输入流上的错误。.pipe() 操作可能会监听输入流上的错误,并通过一个错误传递它 (即使它是管道错误或其他不同的错误),然后当出现读取错误时也可正确地清理 writeStream。但是,.pipe() 没有完全实现这一点。看起来它似乎假设输入流上永远不会出现错误。

相反,你必须单独保存 readStream 对象并直接附加错误处理程序以便查看错误。因此,我不再信任这个复合的东西,文档也从未真正解释如何进行正确的错误处理。我尝试查看 pipeline() 的代码,以了解如何处理错误,但这并没有证明是一个成功的尝试。

所以,你的具体问题似乎可以使用一个转换流来解决:

const fs = require('fs');
const { Transform } = require('stream');

const myTransform = new Transform({
    transform: function(chunk, encoding, callback) {
        let str = chunk.toString('utf8');
        this.push(str.toUpperCase());
        callback();
    }
});

function upperFile(input, output) {
    return new Promise((resolve, reject) => {
        // common function for cleaning up a partial output file
        function errCleanup(err) {
            fs.unlink(output, function(e) {
                if (e) console.log(e);
                reject(err);
            });
        }

        let inputStream = fs.createReadStream(input, {encoding: 'utf8'});
        let outputStream = fs.createWriteStream(output, {emitClose: true});

        // have to separately listen for read/open errors
        inputStream.on("error", err => {
            // have to manually close writeStream when there was an error reading
            if (outputStream) outputStream.destroy();
            errCleanup(err);
        });
        inputStream.pipe(myTransform)
            .pipe(outputStream)
            .on("error", errCleanup)
            .on("close", resolve);        
    });
}

// sample usage
upperFile("input.txt", "output.txt").then(() => {
    console.log("all done");
}).catch(err => {
    console.log("got error", err);
});

正如你所看到的,大约三分之二的代码都在以健壮的方式处理错误(这是内建操作无法正确执行的部分)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接