如何在函数内向可读流(readable stream)推送数据?

5

我想要实现以下功能:

  • getPaths函数读取目录路径,并在找到它们时将它们推入readable流中
  • readable流在接收到路径后,将其作为输入管道(streaming)到write流中。

代码

const fs = require('fs')
const zlib = require('zlib')
const zip = zlib.createGzip()
const Stream = require('stream')


let wstream = fs.createWriteStream('C:/test/file.txt.gz') 
let readable = new Stream.Readable({
  objectMode: true,
  read(item) {
    this.push(item)
  }
})

readable.pipe(zip).pipe(wstream)
.on('finish', (err) => {
  console.log('done');
})

let walkdir = require('walkdir')
function getPaths(dir) {
  let walker = walkdir.sync(dir, {"max_depth": 0, "track_inodes": true}, (path, stat) => {
    readable.push(path)
    console.log('pushing a path to readable')
  }) 
}
getPaths("C:/")
console.log('getPaths() ran')
readable.push(null)  // indicates the end of the stream

问题

路径并未被压缩,并在getPaths函数发现并将它们推入流中时写入文件,只有在找到所有路径后才会写入。我知道这可能是因为进程是同步的,但无法弄清楚如何使其工作。

从日志输出中我看到以下内容:

> // .gz file gets created with size of 0
> // Nothing happens for about 1 minute
> x(184206803) "pushing a path to readable"
> "getPaths() ran"
> // I see the data started being written into the file
> "Done"

更新:

如果我像下面这样异步执行代码(或者使用下面答案中的代码):

let walker = walkdir(dir, {"max_depth": 0, "track_inodes": true})
  walker.on('path', (path, stat) => {
    readable.push(path)
  }) 
  walker.on('end', (path, stat) => {
    readable.push(null)
  }) 

  ...

  // readable.push(null) 

当我将数据传送到readable流中之后,如果没有收到预期的数据块,则会出现错误。

如果从代码中删除最后一行 readable.push(null) ,并尝试再次运行代码,则会抛出相同的错误。

TypeError [ERR_INVALID_ARG_TYPE]: The "chunk" argument must be one of type
 string or Buffer. Received type number

我不熟悉 walkdir,但我想象如果那里有一个 async 函数,它可能会解决问题。 - Amit
@Amit 我实际上尝试过这个,但是readable会抛出一个错误,说它收到了一个数字而不是缓冲区。我认为当你写完后没有将null推入其中时,它会抛出此错误。我现在要编辑问题并添加这些信息。 - Un1
我建议您在推送之前记录您所推送的内容,并查看发生了什么(或者最好是进行调试)。 - Amit
@Amit,问题是它将路径发送到“readable”,但它们会被卡在那里直到函数运行。因为我可以清楚地看到文件在大约1分钟后被写入磁盘(在完成读取所有路径并将它们推送到“readable”之后)。 - Un1
@Amit 我编辑了问题并添加了一个过程中发生的日志。 - Un1
1个回答

1
你的代码非常好,运行得很好。你只需要删除this.push(item)并将read函数设置为空即可。
这是一个有效的片段。
const fs = require('fs')
const zlib = require('zlib')
const zip = zlib.createGzip()
const Stream = require('stream')


let wstream = fs.createWriteStream('C:/test/file.txt.gz') 
let readable = new Stream.Readable({
  objectMode: true,
  read() { }
})

readable.pipe(zip).pipe(wstream)
.on('finish', (err) => {
  console.log('done');
})

let walkdir = require('walkdir')
function getPaths(dir) {
  let walker = walkdir(dir, {"max_depth": 0, "track_inodes": true})
  walker.on('path', (path, stat) => {
    readable.push(path)
  }) 
  walker.on('end', (path, stat) => {
    readable.push(null)
  }) 
}
getPaths("C:/")
console.log('getPaths() ran')

顺便提一下,正确的参数名是read(size)。它代表要读取的字节数量

编辑 不需要可读流。您可以直接写入zip文件。

const fs = require('fs');
const zlib = require('zlib');
const zip = zlib.createGzip();
const wstream = fs.createWriteStream('C:/test/file.txt.gz');

zip.pipe(wstream)
.on('finish', (err) => {
  console.log('done');
})

let walkdir = require('walkdir')
function getPaths(dir) {
  let walker = walkdir(dir, {"max_depth": 0, "track_inodes": true})
  walker.on('path', (path, stat) => {
    zip.write(path);
  })
  walker.on('end', (path, stat) => {
    zip.end();
  })
}
getPaths("C:/")
console.log('getPaths() ran')

谢谢您的回答。不幸的是,它仍然无法正常工作。 如果您扫描一个大文件夹(如'C:/’),您会发现它会获取所有路径并将其存储在内存中,只有在找到所有路径后,它才开始将其引导到“.gz”文件中。 但我尝试让它一边找到路径就一边将其写入“.gz”文件,以便不必将它们全部存储在内存中。 - Un1
我无法确定为什么会发生这种情况。在我看来,walkdir 模块似乎会占用大量 CPU 资源,并导致文件写入延迟,但我仍然不确定。 - Avraham
我尝试使用fork来处理walkdir进程,效果有所改善,但似乎文件系统仍然不堪重负(当我将walkdir输出记录到屏幕时,它会缓冲所有内容,而没有日志则会在读取所有文件之前开始写入文件)。 - Avraham
好的,我尝试使用自定义函数(使用fs.readdirSync)获取和写入路径,它减少了内存使用量,因为我不存储任何路径,只是在找到它们时将它们写入,但是zip仍然只在函数找到所有路径后才将路径写入文件。我不知道,也许这是同步问题。无论如何,感谢你的尝试,伙计。 - Un1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接