Node.JS中的createReadStream

10

所以我使用了 fs.readFile(),它给了我一个

"FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory"

因为 fs.readFile() 在调用回调函数之前会将整个文件加载到内存中,所以我应该改用 fs.createReadStream() 吗?

这就是我之前使用 readFile 的方式:

fs.readFile('myfile.json', function (err1, data) {
    if (err1) {
        console.error(err1);
    } else {
        var myData = JSON.parse(data);
        //Do some operation on myData here
    }
}

抱歉,我对流媒体还比较新,以下方式是否是使用流媒体完成同样的事情的正确方式?

var readStream = fs.createReadStream('myfile.json');

readStream.on('end', function () {  
    readStream.close();
    var myData = JSON.parse(readStream);
    //Do some operation on myData here
});

谢谢

1个回答

16
如果文件很大,那么是的,流式处理是你想要处理它的方式。然而,在你的第二个例子中,你让流将所有文件数据缓冲到内存中,然后在 end 上处理它。从本质上讲,这与使用 readFile 没有什么区别。
你需要查看一下 JSONStream。流式处理意味着你希望按照数据流的方式处理数据。在你的情况下,你显然必须这样做,因为你无法一次性将整个文件缓冲到内存中。考虑到这一点,希望像这样的代码有意义:
JSONStream.parse('rows.*.doc')

注意它具有某种查询模式。这是因为您将不会一次性从文件中获得整个JSON对象/数组,所以您必须更多地考虑JSONStream如何处理在其找到数据时的数据,

您可以使用JSONStream从本质上查询您感兴趣的JSON数据。这样,您永远不会将整个文件缓冲到内存中。它的缺点是,如果您确实需要所有数据,则必须多次流式传输文件,仅在那时使用JSONStream提取您需要的数据,但在您的情况下,您没有太多选择。

您还可以使用JSONStream按顺序解析数据,并执行类似将其转储到数据库的操作。

JSONStream.parse类似于JSON.parse,但它返回一个流而不是整个对象。当解析流获取足够的数据以形成与您的查询匹配的整个对象时,它将发出一个data事件,其中数据是与您的查询相匹配的文档。一旦配置了数据处理程序,您就可以将读取流导入解析流并看着魔术发生。

示例:

var JSONStream = require('JSONStream');
var readStream = fs.createReadStream('myfile.json');
var parseStream = JSONStream.parse('rows.*.doc');
parseStream.on('data', function (doc) {
  db.insert(doc); // pseudo-code for inserting doc into a pretend database.
});
readStream.pipe(parseStream);

这是一种冗长的方式帮助你理解正在发生的事情。以下是一种更为简洁的方式:

var JSONStream = require('JSONStream');
fs.createReadStream('myfile.json')
  .pipe(JSONStream.parse('rows.*.doc'))
  .on('data', function (doc) {
    db.insert(doc);
  });

编辑:

为了更好地理解正在发生的事情,请试着这样想。假设你有一个巨大的湖,你想处理水以净化它并将水移动到新的水库中。如果你有一架巨大的魔法直升机和一个巨大的桶,那么你可以飞越湖泊,把湖水放进桶里,加入处理化学物质,然后将其飞到目的地。

问题在于,并没有这样的直升机能够处理那么多的重量或容积。这是不可能的,但这并不意味着我们不能用不同的方式实现我们的目标。所以你要建造一系列河流(流)在湖泊和新水库之间。然后在这些河流中设置净化站,净化任何通过它的水。这些站点可以以各种方式运作。也许处理速度很快,你可以让河流自由流动,水在最大速度下通过流动时净化就会自然发生。

水的处理可能需要一些时间,或者站点需要一定数量的水才能有效地处理它。因此,你设计你的河流具有闸门,控制着从湖泊流入你的河流的水流量,让站点缓冲所需的水,直到它们完成工作并释放净化后的水向下流入最终目的地。

这就是你想用数据实现的几乎完全相同的操作。解析流就像你的净化站一样,它会缓冲数据直到有足够的数据来形成一个匹配你的查询的完整文档,然后将仅该数据推送到下游(并发出 data 事件)。

Node 流非常好用,因为大多数情况下,你不必处理开启和关闭闸门的问题。Node 流足够智能,当流缓冲一定量的数据时,可以控制回流。就好像净化站和湖泊上的闸门正在进行交流,以确定最佳的流速。

如果你有一个流式数据库驱动程序,那么理论上你应该能够创建某种插入流,然后执行 parseStream.pipe(insertStream) 而不是手动处理 data 事件:D。这里是在另一个文件中创建你的 JSON 文件的过滤版本的示例。

fs.createReadStream('myfile.json')
  .pipe(JSONStream.parse('rows.*.doc'))
  .pipe(JSONStream.stringify())
  .pipe(fs.createWriteStream('filtered-myfile.json'));

1
非常感谢您的帮助!这是否意味着 db.insert(doc) 只被调用一次,那时您就拥有了所有数据?再次感谢。 - user3421904
不,它将针对您提供给JSONStream.parse的每个匹配查询调用。该事件将为每个匹配项发出。查看他们的示例,并假装我们正在解析他们使用的示例JSON blob。 假设这是我们要解析的数据,您将获得恰好两个发出的“data”事件。分别针对“rows”数组中两行中的每个“doc”属性。有意义吗? - Chev
如果您的文件太大,在尝试加载它时出现“内存不足”错误提示,则需要理解您将永远无法一次性获取所有数据。您需要通过 JSONStream 解析流来流式传输文件,只有在需要读取数据时才查询当前所需内容。如果这是个问题,那我强烈建议拆分这个庞大的 JSON 文件 ;)。 - Chev
3
没问题,如果下面的比喻有助于使事情更清晰,我已经加入了。如果它对你有帮助,请不要忘记接受答案 :) - Chev
2
值得更多的点赞!解释得很好,特别是那些“河流”。 - Steven

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接