如何在Node.js中对一个非常大(> 1GB)的文件的每一行运行异步函数

6

假设你拥有一个超过1GB的大型CSV文件,其中包含记录ID:

655453
4930285
493029
4930301
493031
...

对于每个id,您想通过REST API调用获取记录数据,本地转换并将其插入到本地数据库中。如何使用Node.js的可读流(Readable Stream)实现这一点?我的问题基本上是这样的:如何逐行读取一个非常大的文件,对每一行运行异步函数,并且[可选地]能够从特定行开始读取文件?以下是我从Quora问题中学习使用fs.createReadStream的内容:http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js
var fs = require('fs');
var lazy = require('lazy');

var stream = fs.createReadStream(path, {
  flags: 'r',
  encoding: 'utf-8'
});

new lazy(stream).lines.forEach(function(line) {
  var id = line.toString();
  // pause stream
  stream.pause();
  // make async API call...
  makeAPICall(id, function() {
    // then resume to process next id
    stream.resume();
  });
});

但是,那个伪代码行不通,因为 lazy 模块 强制你读取整个文件(作为流,但没有暂停)。所以这种方法似乎行不通。
另一件事是,我想能够从特定行开始处理此文件。原因是,每个 id 的处理(调用 api、清理数据等)可能需要半秒钟的时间,因此我不想每次都从文件开头开始。我想使用的天真方法是只捕获上次处理的最后一个 id 的行号,并保存它。然后,当你再次解析文件时,逐行遍历所有 id,直到找到你停止的行号,然后进行 makeAPICall 业务。另一种天真的方法是编写小文件(比如 100 个 id),逐个处理每个文件(数据集小得足以在没有 IO 流的情况下全部放在内存中)。有更好的方法吗?
我可以看出这会变得棘手(也就是 node-lazy 可以发挥作用的地方),因为 stream.on('data', function(chunk) {}); 中的 chunk 可能只包含一行的 部分(如果缓冲区大小很小,每个块可能是 10 行,但由于 id 的可变长度,它可能只有 9.5 行或其他)。这就是我想知道上述问题的最佳方法的原因。

猜想这就是 Redis 和后台任务存在的目的了... - Lance
开始看起来很有前途:https://gist.github.com/2947293 - Lance
我曾经发布过一篇解决方案,用于解析非常大的文件,使用流进行同步操作。请参见:https://dev59.com/3WQo5IYBdhLWcg3wfPbK#23695940 - Gerard
2个回答

2

与安德鲁·安德烈·利斯托钦的回答有关:

您可以使用类似byline的模块,以获取每行单独的data事件。它是原始文件流周围的转换流,可为每个块生成data事件。这使您可以在每行后暂停。

byline不像lazy一样将整个文件读入内存。

var fs = require('fs');
var byline = require('byline');

var stream = fs.createReadStream('bigFile.txt');
stream.setEncoding('utf8');

// Comment out this line to see what the transform stream changes.
stream = byline.createStream(stream); 

// Write each line to the console with a delay.
stream.on('data', function(line) {
  // Pause until we're done processing this line.
  stream.pause();

  setTimeout(() => {
      console.log(line);

      // Resume processing.
      stream.resume();
  }, 200);
});

1

我猜你不需要使用node-lazy。这是我在Node文档中找到的:

Event: data

function (data) { }

The data event emits either a Buffer (by default) or a string if setEncoding() was used.

这意味着如果您在流上调用setEncoding(),则您的data事件回调将接受一个字符串参数。然后在此回调中,您可以调用.pause().resume()方法。

伪代码应该如下所示:

stream.setEncoding('utf8');
stream.addListener('data', function (line) {
    // pause stream
    stream.pause();
    // make async API call...
    makeAPICall(line, function() {
        // then resume to process next line
        stream.resume();
    });
})

尽管文档没有明确指定流是逐行读取的,但我认为对于文件流来说,情况应该是这样的。至少在其他语言和平台上,文本流是以这种方式工作的,我看不出Node流有什么不同的理由。


1
流不是行缓冲的,它会给你一块数据,这个数据可能会或者不会以换行符结束。 - BCoates

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接