NodeJS:如何使用readStream编写文件解析器?

3

我有一个二进制格式的文件:

格式如下:

[4个字节的头] [8个字节的int64,表示接下来要读取多少字节] [可变字节数(与int64大小相同) - 读取实际信息]

然后它会重复,所以我必须先读取前12个字节以确定我需要读取多少字节。

我已经尝试过:

var readStream = fs.createReadStream('/path/to/file.bin');
readStream.on('data', function(chunk) {  ...  })

我遇到的问题是,每次chunk都以65536字节为一组返回,而我需要更具体地了解我正在读取的字节数。
我曾经尝试过readStream.on('readable', function() { readStream.read(4) }),但这种方法也不太灵活,因为它似乎将异步代码变成了同步代码,因为我必须将“读取”的操作放在while循环中。
或者在这种情况下,也许使用fs.read(fd, buffer, offset, length, position, callback)会更合适?
2个回答

1

以下是我建议的一个抽象处理程序,用于处理像您描述的那样的抽象数据的readStream:

var pending = new Buffer(9999999);
var cursor = 0;
stream.on('data', function(d) {
  d.copy(pending, cursor);
  cursor += d.length;

  var test = attemptToParse(pending.slice(0, cursor));
  while (test !== false) {
    // test is a valid blob of data
    processTheThing(test);

    var rawSize = test.raw.length; // How many bytes of data did the blob actually take up?
    pending.copy(pending.copy, 0, rawSize, cursor); // Copy the data after the valid blob to the beginning of the pending buffer
    cursor -= rawSize;
    test = attemptToParse(pending.slice(0, cursor)); // Is there more than one valid blob of data in this chunk? Keep processing if so
  }
});

针对您的使用情况,请确保 pending 缓冲区的初始化大小足够大,以容纳您将解析的最大可能有效数据块(您提到了 int64;此最大大小加上标头大小)以及额外的 65536 字节,以防止边缘处的流块正好位于数据块边界。

我的方法需要一个 attemptToParse() 方法,它接受一个缓冲区并尝试从中解析数据。如果缓冲区长度太短(数据还没有足够进来),则应返回 false。如果是有效对象,则应返回一些已解析对象,其中包含一种显示其占用原始字节的方式(例如我的示例中的 .raw 属性)。然后进行任何必要的数据处理(processTheThing()),删除该数据块中的有效数据,将挂起的缓冲区移位为其余部分,并继续进行下一个数据块的处理。这样,您就不需要不断增长的 pending 缓冲区或一些“完成”的数据块数组。也许在 processTheThing() 的接收端上进行的处理将数据块存储在内存中的数组中,也许将其写入数据库,但在此示例中,这些过程被抽象化了,因此这段代码只处理如何处理流数据。


有趣的是,尽管我们的代码执行非常相似的操作(在缓冲区中收集数据块),但代码和方法调用却完全不同。例如,d.copy()Buffer.concat() - user949300
正确;在这种情况下,我避免使用Buffer.concat(),因为没有必要处理数组和缓冲区;将增长的缓冲区包装在数组中只会增加一些开销,而且如果没有给出长度,concat()方法说它必须循环两次缓冲区,所以可能使用copy()方法,并使用指针来保持位置是更好的性能方法,尽管我还没有确定。 - MidnightLightning

0
将块添加到缓冲区,然后从那里解析数据。请注意不要超出缓冲区的末尾(如果您的数据很大)。我现在正在使用我的平板电脑,所以无法添加任何示例源代码。也许其他人可以?
好的,这是一个非常简单的源代码。
var chunks = [];
var bytesRead= 0;

stream.on('data', function(chunk) {

   chunks.push(chunk);
   bytesRead += chunk.length;

   // look at bytesRead...
   var buffer = Buffer.concat(chunks);
   chunks = [buffer];  // trick for next event
      // --> or, if memory is an issue, remove completed data from the beginning of chunks
   // work with the buffer here...

}

你如何将块添加到缓冲区?我需要在某个地方保留缓冲变量并等待下一个数据事件吗?例如,如果我需要读取12个字节,然后发现我需要读取100k字节,而块的长度为60k,我是否需要等待下一个数据事件来将其附加到缓冲区中。 - samol
如果数据量为100k,您将需要等待更多的数据事件。希望这能帮助您入门。 - user949300
所以块将会不断增长成一个巨大的数组吗? - samol
在这个简单的例子中,是的。更聪明的代码将从chunks[]开头删除已完成的数据。(并保留偏移量) - user949300

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接