错误[ERR_STREAM_PREMATURE_CLOSE]: Node管道流中的过早关闭

15

我正在使用Node的stream.pipeline功能将一些数据上传到S3。我正在实现的基本思想是从请求中提取文件并将它们写入S3。我有一个pipeline可以成功地从请求中提取zip文件并将其写入S3。但是,我希望我的第二个pipeline执行相同的请求,但解压缩并将未压缩的文件写入S3。管道代码如下所示:

pipeline(request.get(...), s3Stream(zipFileWritePath)),
pipeline(request.get(...), new unzipper.Parse(), etl.map(entry => entry.pipe(s3Stream(createWritePath(writePath, entry)))))

s3Stream函数看起来是这样的:

function s3Stream(file) {
    const pass = new stream.PassThrough()
    s3Store.upload(file, pass)
    return pass
}

The first pipeline works well, and is currently operating greatly in production. However, when adding the second pipeline, I get the following error:

第一个pipeline表现良好,并且目前在生产中运行良好。但是,当添加第二个pipeline时,我遇到了以下错误:
Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at Parse.onclose (internal/streams/end-of-stream.js:56:36)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at Parse.<anonymous> (/node_modules/unzipper/lib/parse.js:28:10)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at finishMaybe (_stream_writable.js:641:14)
at afterWrite (_stream_writable.js:481:3)
at onwrite (_stream_writable.js:471:7)
at /node_modules/unzipper/lib/PullStream.js:70:11
at afterWrite (_stream_writable.js:480:3)
at process._tickCallback (internal/process/next_tick.js:63:19)
任何想法是什么导致这种情况或解决此问题的解决方案将不胜感激!
1个回答

10

简而言之

当使用管道时,您需要完全读取可读流,不希望在可读流结束之前停止任何内容。

深入探讨

在使用这些技巧一段时间后,这里有一些更有用的信息。

import stream from 'stream'

const s1 = new stream.PassThrough()
const s2 = new stream.PassThrough()
const s3 = new stream.PassThrough()

s1.on('end', () => console.log('end 1'))
s2.on('end', () => console.log('end 2'))
s3.on('end', () => console.log('end 3'))
s1.on('close', () => console.log('close 1'))
s2.on('close', () => console.log('close 2'))
s3.on('close', () => console.log('close 3'))

stream.pipeline(
    s1,
    s2,
    s3,
    async s => { for await (_ of s) { } },
    err => console.log('end', err)
)

现在如果我调用s2.end(),它将关闭所有父元素

end 2
close 2
end 3
close 3

pipeline相当于s3(s2(s1))

但如果我调用s2.destroy(),它会打印并销毁所有内容。这是您的问题,这里的流在正常结束之前被销毁了,可能是在asyncGenerator / asyncFunction中发生了错误或有返回/中断/抛出。

close 2
end Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at PassThrough.onclose (internal/streams/end-of-stream.js:117:38)
    at PassThrough.emit (events.js:327:22)
    at emitCloseNT (internal/streams/destroy.js:81:10)
    at processTicksAndRejections (internal/process/task_queues.js:83:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}
close 1
close 3

不要让其中一个流没有捕捉错误的方法。

在调用回调函数后,stream.pipeline() 会在流上留下悬挂的事件监听器。如果在失败后重复使用流,这可能导致事件监听器泄漏和错误被吞噬。

node 源代码 (14.4)

  const onclose = () => {
    if (readable && !readableEnded) {
      if (!isReadableEnded(stream))
        return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
    }
    if (writable && !writableFinished) {
      if (!isWritableFinished(stream))
        return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
    }
    callback.call(stream);
  };

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接