Node.js如何将流复制到文件中而不会消耗它?

6

给定一个解析输入流的函数:

async onData(stream, callback) {
    const parsed = await simpleParser(stream)

    // Code handling parsed stream here
    // ...

    return callback()
}

我正在寻找一种简单而安全的方法来“复制”该流,以便我可以将其保存到文件进行调试,而不影响代码。这种方式可行吗?

在伪代码中相同的问题:我正在尝试做一些类似于这样的事情。显然,这是一个虚构的例子,并且不能正常工作。

const fs = require('fs')
const wstream = fs.createWriteStream('debug.log')

async onData(stream, callback) {
    const debugStream = stream.clone(stream) // Fake code
    wstream.write(debugStream)

    const parsed = await simpleParser(stream)

    // Code handling parsed stream here
    // ...

    wstream.end()

    return callback()
}

为什么要克隆流,既然您仍然可以再次从中读取? - 0.sh
@0.sh 效率。 - Redsandro
如果您没有调用 stream.close(),那么就不需要克隆流。 - 0.sh
@0.sh 真的这么简单吗?我以为我需要像cloneable-readable这样的东西(我没有在我的答案中包含它,以防止影响我得到的答案)。 - Redsandro
@0.sh 实际上,您无法多次从同一流中读取,因为只要第一个读取完成,流就会关闭,所有其他读取都将不完整。 - Ivan Cherviakov
我尝试了这个答案,它对我起作用了。 - Ivan Cherviakov
2个回答

11
不,你不能在不消耗的情况下克隆一个可读流。然而,你可以将它分别传输两次,一次用于创建文件,另一次用于“克隆”。
以下是代码:
let Readable = require('stream').Readable;
var stream = require('stream')

// original stream, maybe from your parser or network
var s = new Readable()
s.push('beep')
s.push(null)  

// here use stream1 for normal usage, like creating file, 
// and use stream2 for debugging, like a cloned stream.
var stream1 = s.pipe(new stream.PassThrough())
var stream2 = s.pipe(new stream.PassThrough())

// I just print them out for a quick show
stream1.pipe(process.stdout)
stream2.pipe(process.stdout)

-1

我尝试了@jiajianrong提供的解决方案,但是在使用createReadStream时遇到了困难,因为当我直接推送createReadStream时,Readable会抛出错误。就像这样:

s.push(createReadStream())

为了解决这个问题,我使用了一个辅助函数将流转换成缓冲区。
function streamToBuffer (stream: any) {
  const chunks: Buffer[] = []
  return new Promise((resolve, reject) => {
    stream.on('data', (chunk: any) => chunks.push(Buffer.from(chunk)))
    stream.on('error', (err: any) => reject(err))
    stream.on('end', () => resolve(Buffer.concat(chunks)))
  })
}

使用一个管道生成流的哈希值,另一个管道将流上传到云存储中。下面是我找到的解决方案。
import stream from 'stream'
const Readable = require('stream').Readable

const s = new Readable()
s.push(await streamToBuffer(createReadStream()))
s.push(null)

const fileStreamForHash = s.pipe(new stream.PassThrough())
const fileStreamForUpload = s.pipe(new stream.PassThrough())

// Generating file hash
const fileHash = await getHashFromStream(fileStreamForHash)

// Uploading stream to cloud storage
await BlobStorage.upload(fileName, fileStreamForUpload)

我的回答主要基于jiajianrong的答案。


把流缓冲然后推送到另一个流毫无意义,你可以直接将流从createReadStream管道到两个下游。之所以出错是因为你不能把流推送到其他流里,这就是管道的作用。@jiajianrong 在他的示例中的 Readable 只是一个示例;它是一个虚拟流,里面有些内容来演示.. 你已经有了流(从 createReadStream 获取回来的),你的 s 就是创建的读取流。 - gabriel.hayes
@gabriel.hayes - 嗨 Gabriel,感谢你的反馈,我会试一试。我会尽快发布我的结果。 - DanielHefti

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接