我想要实现以下功能:
getPaths
函数读取目录路径,并在找到它们时将它们推入readable
流中readable
流在接收到路径后,将其作为输入管道(streaming)到write
流中。
代码
const fs = require('fs')
const zlib = require('zlib')
const zip = zlib.createGzip()
const Stream = require('stream')
let wstream = fs.createWriteStream('C:/test/file.txt.gz')
let readable = new Stream.Readable({
objectMode: true,
read(item) {
this.push(item)
}
})
readable.pipe(zip).pipe(wstream)
.on('finish', (err) => {
console.log('done');
})
let walkdir = require('walkdir')
function getPaths(dir) {
let walker = walkdir.sync(dir, {"max_depth": 0, "track_inodes": true}, (path, stat) => {
readable.push(path)
console.log('pushing a path to readable')
})
}
getPaths("C:/")
console.log('getPaths() ran')
readable.push(null) // indicates the end of the stream
问题
路径并未被压缩,并在getPaths
函数发现并将它们推入流中时写入文件,只有在找到所有路径后才会写入。我知道这可能是因为进程是同步的,但无法弄清楚如何使其工作。
从日志输出中我看到以下内容:
> // .gz file gets created with size of 0
> // Nothing happens for about 1 minute
> x(184206803) "pushing a path to readable"
> "getPaths() ran"
> // I see the data started being written into the file
> "Done"
更新:
如果我像下面这样异步执行代码(或者使用下面答案中的代码):
let walker = walkdir(dir, {"max_depth": 0, "track_inodes": true})
walker.on('path', (path, stat) => {
readable.push(path)
})
walker.on('end', (path, stat) => {
readable.push(null)
})
...
// readable.push(null)
当我将数据传送到readable流中之后,如果没有收到预期的数据块,则会出现错误。
如果从代码中删除最后一行 readable.push(null)
,并尝试再次运行代码,则会抛出相同的错误。
TypeError [ERR_INVALID_ARG_TYPE]: The "chunk" argument must be one of type
string or Buffer. Received type number
walkdir
,但我想象如果那里有一个async
函数,它可能会解决问题。 - Amitreadable
会抛出一个错误,说它收到了一个数字而不是缓冲区。我认为当你写完后没有将null
推入其中时,它会抛出此错误。我现在要编辑问题并添加这些信息。 - Un1