使用Node.js逐行读取解析大型日志文件

153
我可以帮助你翻译。这段内容是关于编程的,需要在Javascript/Node.js中对大型(5-10 Gb)的日志文件进行解析(使用Cube)。日志行的格式大致如下:
10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS".

我们需要逐行阅读,进行一些解析(例如剥离57SUCCESS),然后使用Cube的JS客户端将这些数据注入(https://github.com/square/cube)。
首先,在Node中按行读取文件的规范方式是什么?
这似乎是一个相当常见的在线问题: 很多答案似乎都指向了一堆第三方模块:

然而,这似乎是一个非常基本的任务 - 在stdlib中肯定有一种简单的方法按行读取文本文件吧?

其次,我需要处理每一行(例如将时间戳转换为Date对象,并提取有用的字段)。

最大化吞吐量的最佳方法是什么?是否有一种不会在读取每行或将其发送到Cube时阻塞的方式?

第三 - 我猜使用字符串分割和JS的包含等价物(IndexOf!= -1?)比正则表达式快得多?有没有人在Node.js中解析大量文本数据方面有很多经验?


我在node中构建了一个日志解析器,它使用一堆具有内置'捕获'的正则表达式字符串并将输出转换为JSON。如果您想进行计算,甚至可以在每个捕获上调用函数。这可能正是您想要的:https://npmjs.org/package/logax - Jess
一种更好的比较方式 https://betterprogramming.pub/a-memory-friendly-way-of-reading-files-in-node-js-a45ad0cc7bb6 - yashodha_h
13个回答

249

我寻找一种解析非常大(gbs)文件的方法,逐行使用流进行解析。所有第三方库和示例都不符合我的需求,因为它们不是逐行处理文件(如1、2、3、4...),或者将整个文件读入内存中。

以下解决方案可以使用流和管道逐行解析非常大的文件。为了测试,我使用了一个大小为2.1 gb、包含17,000,000条记录的文件。内存使用量不超过60 mb。

首先,安装event-stream软件包:

npm install event-stream

那么:

var fs = require('fs')
    , es = require('event-stream');

var lineNr = 0;

var s = fs.createReadStream('very-large-file.csv')
    .pipe(es.split())
    .pipe(es.mapSync(function(line){

        // pause the readstream
        s.pause();

        lineNr += 1;

        // process line here and call s.resume() when rdy
        // function below was for logging memory usage
        logMemoryUsage(lineNr);

        // resume the readstream, possibly from a callback
        s.resume();
    })
    .on('error', function(err){
        console.log('Error while reading file.', err);
    })
    .on('end', function(){
        console.log('Read entire file.')
    })
);

输入图像描述

请告诉我它的进展如何!


9
这段代码不是同步的,而是异步的。如果在你的代码的最后一行后插入 console.log(lineNr),它将不能显示最终的行数,因为该文件是异步读取的。 - jfriend00
6
谢谢,这是我找到的唯一一个能够按预期暂停和恢复的解决方案。Readline 不能做到。 - Brent
3
很棒的例子,并且它确实会暂停。另外,如果您决定提前停止文件读取,可以使用s.end(); - zipzit
2
完美地工作。用它索引了1.5亿份文档到elasticsearch索引中。readline模块很头疼。它不会暂停并且每次在4,5千万份后都导致失败。浪费了一整天的时间。非常感谢你的答案。这个解决方法完美运行。 - Mandeep Singh
4
event-stream 被入侵了:https://medium.com/intrinsic/compromised-npm-package-event-stream-d47d08605502,但 4+ 版本似乎是安全的:https://blog.npmjs.org/post/180565383195/details-about-the-event-stream-incident - John Vandivier
显示剩余12条评论

88

您可以使用内置的 readline 包,查看文档请点击此处。使用创建一个新的输出流。

    var fs = require('fs'),
        readline = require('readline'),
        stream = require('stream');
    
    var instream = fs.createReadStream('/path/to/file');
    var outstream = new stream;
    outstream.readable = true;
    outstream.writable = true;
    
    var rl = readline.createInterface({
        input: instream,
        output: outstream,
        terminal: false
    });
    
    rl.on('line', function(line) {
        console.log(line);
        //Do your stuff ...
        //Then write to output stream
        rl.write(line);
    });

大文件处理需要一些时间,请告知是否有效。


2
正如所写,倒数第二行失败是因为cubestuff未定义。 - Greg
3
使用readline,是否可以暂停/恢复读取流以执行“做事情”区域的异步操作? - jchook
3
当我尝试暂停/恢复时,readline 给了我很多问题。它无法正确地暂停流,如果下游进程速度较慢,则会引发很多问题。 - Mandeep Singh

35

我非常喜欢@gerard的答案,实际上它应该是正确的答案。我做出了一些改进:

  • 代码位于一个类中(模块化)
  • 包括解析过程
  • 在外部提供继续进行的能力,以防止异步作业链接到读取CSV,例如插入到数据库或HTTP请求
  • 可以读取分块/批量大小,用户可以自己声明。我也在流中处理编码,以防文件具有不同的编码方式。

以下是代码:

'use strict'

const fs = require('fs'),
    util = require('util'),
    stream = require('stream'),
    es = require('event-stream'),
    parse = require("csv-parse"),
    iconv = require('iconv-lite');

class CSVReader {
  constructor(filename, batchSize, columns) {
    this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8'))
    this.batchSize = batchSize || 1000
    this.lineNumber = 0
    this.data = []
    this.parseOptions = {delimiter: '\t', columns: true, escape: '/', relax: true}
  }

  read(callback) {
    this.reader
      .pipe(es.split())
      .pipe(es.mapSync(line => {
        ++this.lineNumber

        parse(line, this.parseOptions, (err, d) => {
          this.data.push(d[0])
        })

        if (this.lineNumber % this.batchSize === 0) {
          callback(this.data)
        }
      })
      .on('error', function(){
          console.log('Error while reading file.')
      })
      .on('end', function(){
          console.log('Read entirefile.')
      }))
  }

  continue () {
    this.data = []
    this.reader.resume()
  }
}

module.exports = CSVReader

基本上,这是你将如何使用它:

let reader = CSVReader('path_to_file.csv')
reader.read(() => reader.continue())

我用一个35GB的CSV文件测试过,对我有效,这就是为什么我选择基于@gerard的答案构建它的原因,欢迎反馈。

它花了多少时间? - Bernardo Dal Corno
1
显然,这里缺少了 pause() 调用,不是吗? - Vanuan
1
此外,这不会在结束时调用回调函数。因此,如果batchSize为100,文件大小为150,则只处理100个项目。我错了吗? - Vanuan

25

我使用了https://www.npmjs.com/package/line-by-line来读取一个文本文件中超过一百万行的内容。在这种情况下,占用 RAM 的容量约为 50-60 兆字节。

    const LineByLineReader = require('line-by-line'),
    lr = new LineByLineReader('big_file.txt');

    lr.on('error', function (err) {
         // 'err' contains error object
    });

    lr.on('line', function (line) {
        // pause emitting of lines...
        lr.pause();

        // ...do your asynchronous line processing..
        setTimeout(function () {
            // ...and continue emitting lines.
            lr.resume();
        }, 100);
    });

    lr.on('end', function () {
         // All lines are read, file is closed now.
    });

1
“逐行读取”比所选答案更节省内存。对于一个包含100万行的CSV文件,所选答案会使我的节点进程占用800多兆字节的内存。而使用“逐行读取”,它始终保持在700多兆字节的低水平。此模块还可以使代码保持整洁易读。总共我需要读取大约1800万行,因此每个MB都很重要! - Neo
1
很遗憾,这个程序使用了自己的事件“line”,而不是标准的“chunk”,这意味着您将无法使用“pipe”。 - Rene Wooller
1
经过数小时的测试和搜索,这是唯一一个在lr.cancel()方法上真正停止的解决方案。在1毫秒内读取了5Gig文件的前1000行。太棒了!!! - Perez Lamed van Niekerk

19

Node.js文档提供了一个非常优雅的例子,使用Readline模块。

示例:逐行读取文件流

const { once } = require('node:events');
const fs = require('fs');
const readline = require('readline');

const rl = readline.createInterface({
    input: fs.createReadStream('sample.txt'),
    crlfDelay: Infinity
});

rl.on('line', (line) => {
    console.log(`Line from file: ${line}`);
});

await once(rl, 'close');
注意:我们使用crlfDelay选项将所有CR LF('\r\n')实例识别为单个换行符。

在我的情况下,我想使用元素的 innerHTML 在 HTML 中显示整个文本,但是最后一行总是被截断,即使我在 CSS 中设置了 overflow: auto。这是怎么回事? - kakyo
好的,我明白了。我需要使用比我的padding参数更大的padding-bottom - kakyo
你能解释一下使用'readline'的目的吗?为什么我们不能只使用'readStream'来完成它呢? - Apoorva Ambhoj

8
除了逐行读取大文件外,您还可以分块读取它。有关详细信息,请参见本文
var offset = 0;
var chunkSize = 2048;
var chunkBuffer = new Buffer(chunkSize);
var fp = fs.openSync('filepath', 'r');
var bytesRead = 0;
while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) {
    offset += bytesRead;
    var str = chunkBuffer.slice(0, bytesRead).toString();
    var arr = str.split('\n');

    if(bytesRead = chunkSize) {
        // the last item of the arr may be not a full line, leave it to the next chunk
        offset -= arr.pop().length;
    }
    lines.push(arr);
}
console.log(lines);

3
以下是否应该是比较而不是赋值:if(bytesRead = chunkSize) - Stefan Rein

4
使用原生的Node.js模块(fs,readline)通过流来读写文件:
const fs = require('fs');
const readline = require('readline');

const rl = readline.createInterface({
                                       input:  fs.createReadStream('input.json'),
                                       output: fs.createWriteStream('output.json')
                                    });

rl.on('line', function(line) {
    console.log(line);

    // Do any 'line' processing if you want and then write to the output file
    this.output.write(`${line}\n`);
});

rl.on('close', function() {
    console.log(`Created "${this.output.path}"`);
});

4
我也曾遇到过同样的问题。在比较了几个似乎具有此功能的模块后,我决定自己来做,这比我想象的要简单。

代码片段: https://gist.github.com/deemstone/8279565

var fetchBlock = lineByline(filepath, onEnd);
fetchBlock(function(lines, start){ ... });  //lines{array} start{int} lines[0] No.

它涵盖了在闭包中打开的文件,fetchBlock()返回将从文件中获取一个块,并分割成数组(将处理最后获取的片段)。
我已经将块大小设置为每个读取操作的1024。这可能存在错误,但代码逻辑是明显的,请自行尝试。

3

根据这个问题的答案,我实现了一个类,可以使用fs.readSync()逐行同步读取文件。您可以使用Q promise使其“暂停”和“恢复”(jQuery需要DOM,因此无法在nodejs中运行):

var fs = require('fs');
var Q = require('q');

var lr = new LineReader(filenameToLoad);
lr.open();

var promise;
workOnLine = function () {
    var line = lr.readNextLine();
    promise = complexLineTransformation(line).then(
        function() {console.log('ok');workOnLine();},
        function() {console.log('error');}
    );
}
workOnLine();

complexLineTransformation = function (line) {
    var deferred = Q.defer();
    // ... async call goes here, in callback: deferred.resolve('done ok'); or deferred.reject(new Error(error));
    return deferred.promise;
}

function LineReader (filename) {      
  this.moreLinesAvailable = true;
  this.fd = undefined;
  this.bufferSize = 1024*1024;
  this.buffer = new Buffer(this.bufferSize);
  this.leftOver = '';

  this.read = undefined;
  this.idxStart = undefined;
  this.idx = undefined;

  this.lineNumber = 0;

  this._bundleOfLines = [];

  this.open = function() {
    this.fd = fs.openSync(filename, 'r');
  };

  this.readNextLine = function () {
    if (this._bundleOfLines.length === 0) {
      this._readNextBundleOfLines();
    }
    this.lineNumber++;
    var lineToReturn = this._bundleOfLines[0];
    this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany)
    return lineToReturn;
  };

  this.getLineNumber = function() {
    return this.lineNumber;
  };

  this._readNextBundleOfLines = function() {
    var line = "";
    while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) { // read next bytes until end of file
      this.leftOver += this.buffer.toString('utf8', 0, this.read); // append to leftOver
      this.idxStart = 0
      while ((this.idx = this.leftOver.indexOf("\n", this.idxStart)) !== -1) { // as long as there is a newline-char in leftOver
        line = this.leftOver.substring(this.idxStart, this.idx);
        this._bundleOfLines.push(line);        
        this.idxStart = this.idx + 1;
      }
      this.leftOver = this.leftOver.substring(this.idxStart);
      if (line !== "") {
        break;
      }
    }
  }; 
}

2

node-byline使用流,所以对于大文件,我更喜欢使用它。

对于日期转换,我建议使用Moment.js

为了最大化吞吐量,您可以考虑使用软件集群。有一些不错的模块很好地包装了Node原生的cluster模块。我喜欢Isaacs的cluster-master。例如,您可以创建一个由x个工作进程组成的集群,它们都计算一个文件。

要比较拆分和正则表达式的基准,请使用Benchmark.js。我目前还没有测试过它。Benchmark.js可作为一个Node模块使用。


2
注意,由于存在重大性能问题,moment.js现在已经不再受欢迎,主要原因是它的庞大占用空间、无法进行树摇和深度根植但现在广泛不受喜爱的可变性。即使是它自己的开发人员也几乎放弃了它。一些很好的替代品是date-fnsday.js;这里有一篇更详细的文章:https://www.skypack.dev/blog/2021/02/the-best-javascript-date-libraries/。 - Ezekiel Victor

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接