使用Node.js流处理错误

198

处理流错误的正确方法是什么?我已经知道可以监听'error'事件,但我想了解一些更复杂的情况的细节。

首先,在进行简单的管道链时,你该怎么做:

input.pipe(transformA).pipe(transformB).pipe(transformC)...

如何正确地创建其中一个转换,以便正确处理错误?

更多相关问题:

  • 当出现错误时,“end”事件会发生什么情况?它永远不会被触发吗?有时会被触发吗?这取决于转换/流吗?这里有哪些标准?
  • 是否有任何机制通过管道传播错误?
  • 域是否有效地解决了这个问题?有示例会很好。
  • 'error'事件产生的错误是否具有堆栈跟踪?有时候?从中获取堆栈跟踪的方法是什么?

1
这并不是微不足道的。Promise框架使它变得简单许多。 - salezica
35
遗憾的是,承诺/未来值(promises/futures)无法真正帮助您处理流(streams)... - B T
9个回答

263

转换

转换流既可读又可写,因此它们是非常好的“中间”流。因此,它们有时被称为through流。它们在这方面类似于双工流,但提供了一个良好的接口来操作数据,而不仅仅是通过发送数据。转换流的目的是在流传输数据时对其进行操作。例如,您可能想要进行一些异步调用,或者派生一些字段,重新映射一些内容等。


Where you might put a transform stream


要创建一个转换流,请参见这里这里。您需要做的只是:
  1. 包含stream模块
  2. 实例化(或继承)Transform类
  3. 实现一个_transform方法,该方法接受(chunk,encoding,callback)

chunk是您的数据。如果您在objectMode = true中工作,则大多数时候无需担心编码问题。处理完成后将调用回调函数以将此块推到下一个流中。

如果您想要一个很好的助手模块来帮助您轻松地进行完全流操作,我建议使用through2

有关错误处理,请继续阅读。

管道

在管道链中,处理错误确实是非常棘手的。根据this thread所述,.pipe()不是为了转发错误而构建的。因此,类似于...
var a = createStream();
a.pipe(b).pipe(c).on('error', function(e){handleError(e)});

...只会在流c上监听错误。如果a上发出了错误事件,它不会被传递下去,实际上会抛出异常。要正确处理这个问题:

var a = createStream();
a.on('error', function(e){handleError(e)})
.pipe(b)
.on('error', function(e){handleError(e)})
.pipe(c)
.on('error', function(e){handleError(e)});

现在,虽然第二种方式更冗长,但至少你可以保留错误发生的上下文。这通常是一件好事。

有一个库我觉得很有用,如果你只想在目标位置捕获错误,而不太关心它发生在哪里,那就是event-stream

结束

当触发错误事件时,将不会显式地触发结束事件。发生错误事件将终止流。

根据我的经验,域在大多数情况下都非常有效。如果出现未处理的错误事件(即在没有监听器的流上发出错误),服务器可能会崩溃。如上文所指出的,你可以将流包装在一个域中,以正确捕获所有错误。

var d = domain.create();
 d.on('error', handleAllErrors);
 d.run(function() {
     fs.createReadStream(tarball)
       .pipe(gzip.Gunzip())
       .pipe(tar.Extract({ path: targetPath }))
       .on('close', cb);
 });

域名的美妙之处在于它们可以保留堆栈跟踪。虽然 event-stream 也做得很好。

如需进一步阅读,请查看stream-handbook1。非常深入,但非常有用,并提供了许多有用模块的链接。

1: 注意:由于原始 GitHub 存储库在2022年8月左右被删除,此链接指向archive.org。


这是非常棒的信息,谢谢!您能否补充一下为什么要创建转换流以及它与我的问题有何关系? - B T
当然可以 - 不过我认为这与你的问题有关,因此才回答了它 ;) - mshell_lauren
1
这篇关于nodejs的帖子是由isaccs在Google Groups上发布的:https://groups.google.com/d/msg/nodejs/lJYT9hZxFu0/L59CFbqWGyYJ(不是grokbase)。 - jpillora
15
请注意,您无需将.on('error')处理程序包装在匿名函数中,即a.on('error', function(e){handleError(e)})可以简写为a.on('error', handleError) - timoxley
4
domain 模块正在被弃用: https://nodejs.org/api/domain.html - famzah
显示剩余3条评论

46

如果您使用的是Node.js版本大于等于v10.0.0,您可以使用stream.pipelinestream.finished

例如:

const { pipeline, finished } = require('stream');

pipeline(
  input, 
  transformA, 
  transformB, 
  transformC, 
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
});


finished(input, (err) => {
  if (err) {
    console.error('Stream failed', err);
  } else {
    console.log('Stream is done reading');
  }
});

请查看此GitHub PR以获得更多讨论。


2
当管道已经有回调函数时,为什么还要使用 finished 呢? - Marcos Pereira
5
在管道和单个流之间,您可能希望以不同的方式处理错误。 - shusson
pipeline 可以用于双工流吗?如果在双工流上使用,我们需要创建两个 pipeline 吗?我有两个 TCP 套接字需要相互连接,创建两个 pipeline 有意义吗? - CMCDragonkai
谢谢,pipeline() 正是我所寻找的。 - Sven Jacobs

26

域名已经不建议使用,您不需要它们。

对于这个问题而言,在转换或可写之间的区别并不是那么重要。

mshell_lauren的答案很好,但是作为替代方案,您也可以明确地监听您认为可能出错的每个流上的错误事件,并在需要时重用处理程序函数。

var a = createReadableStream()
var b = anotherTypeOfStream()
var c = createWriteStream()

a.on('error', handler)
b.on('error', handler)
c.on('error', handler)

a.pipe(b).pipe(c)

function handler (err) { console.log(err) }

这样做可以防止臭名昭著的未捕获异常,如果这些流中的一个触发了其错误事件


5
祝你好运,处理三种不同的错误事件,并祈祷编写三个不同流库的人正确地实现了错误处理。 - Alexander Mills
6
1)处理三个类型相同的事件时,为什么它们存在“不同”的问题——即使这些事件的类型都是“错误”,也应该认识到每个事件都是独立的;2)除了原生Node.js功能之外,还有哪些流库是编写在其上面的?3)为什么内部如何处理事件很重要,尽管显然可以允许任何人在已有的错误处理程序之上附加其他错误处理程序? - Armen Michaeli
如果一个流出现错误,其他流是否也会关闭? - fishbone

13

使用一个简单的函数可以将整个链中的错误传播到最右侧的流。

function safePipe (readable, transforms) {
    while (transforms.length > 0) {
        var new_readable = transforms.shift();
        readable.on("error", function(e) { new_readable.emit("error", e); });
        readable.pipe(new_readable);
        readable = new_readable;
    }
    return readable;
}

可以像这样使用:

safePipe(readable, [ transform1, transform2, ... ]);

7

.on("error", handler) 只处理流错误,但如果您使用自定义转换流,则 .on("error", handler) 无法捕获发生在 _transform 函数内部的错误。因此,您可以像下面这样控制应用程序流程:

_transform 函数中的 this 关键字指的是流本身,它是一个 EventEmitter。因此,您可以像下面这样使用 try catch 来捕获错误,然后将其传递给自定义事件处理程序。

// CustomTransform.js
CustomTransformStream.prototype._transform = function (data, enc, done) {
  var stream = this
  try {
    // Do your transform code
  } catch (e) {
    // Now based on the error type, with an if or switch statement
    stream.emit("CTError1", e)
    stream.emit("CTError2", e)
  }
  done()
}

// StreamImplementation.js
someReadStream
  .pipe(CustomTransformStream)
  .on("CTError1", function (e) { console.log(e) })
  .on("CTError2", function (e) { /*Lets do something else*/ })
  .pipe(someWriteStream)

这样,您可以将逻辑和错误处理程序分开。此外,您可以选择处理一些错误并忽略其他错误。
更新:可选方案:RXJS Observable。

5
使用 multipipe 包将多个流组合成一个双工流,并在一个位置处理错误。
const pipe = require('multipipe')

// pipe streams
const stream = pipe(streamA, streamB, streamC) 


// centralized error handling
stream.on('error', fn)

2
const http = require('http');
const fs = require('fs');
const server = http.createServer();

server.on('request',(req,res)=>{
    const readableStream = fs.createReadStream(__dirname+'/README.md');
    const writeableStream = fs.createWriteStream(__dirname+'/assets/test.txt');
    readableStream
    .on('error',()=>{
        res.end("File not found")
    })
    .pipe(writeableStream)
    .on('error',(error)=>{
        console.log(error)
        res.end("Something went to wrong!")
    })
    .on('finish',()=>{
        res.end("Done!")
    })
})

server.listen(8000,()=>{
    console.log("Server is running in 8000 port")
})

我非常确定这个程序没有正确捕获当管道到可写流时发生的错误。 - B T
@BT,根据您的评论,我已经编辑了我的帖子,现在我相信我们可以在将数据管道传输到可写流时处理任何错误。 - Soura Ghosh

2

通过创建转换流机制并调用它的回调done,使用Node.js模式来传播错误:

var transformStream1 = new stream.Transform(/*{objectMode: true}*/);

transformStream1.prototype._transform = function (chunk, encoding, done) {
  //var stream = this;

  try {
    // Do your transform code
    /* ... */
  } catch (error) {
    // nodejs style for propagating an error
    return done(error);
  }

  // Here, everything went well
  done();
}

// Let's use the transform stream, assuming `someReadStream`
// and `someWriteStream` have been defined before
someReadStream
  .pipe(transformStream1)
  .on('error', function (error) {
    console.error('Error in transformStream1:');
    console.error(error);
    process.exit(-1);
   })
  .pipe(someWriteStream)
  .on('close', function () {
    console.log('OK.');
    process.exit();
  })
  .on('error', function (error) {
    console.error(error);
    process.exit(-1);
   });

嗯,你的意思是说如果所有的流处理器都是这样构建的,错误就会传播吗? - B T
谢谢。对于流转换器,这似乎是正确的方法。错误通过stream.pipeline传播,可以非常容易地包装在一个Promise中。 - AWS User

-1

谢谢,但这根本没有回答问题。 - B T
给我一个40页的文档并不是有帮助的。在那么大的页面中,你认为我应该参考什么?另外,你有没有看过我的问题?我的问题不是“try-catch与流一起工作吗?”我已经很清楚了,try-catch无法处理异步错误,例如来自流处理管道的错误。 - B T

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接