使用Nodejs读取非常大的文件(~10GB),逐行处理并写入到另一个文件。

7
我有一个特定格式的10 GB日志文件,我想逐行处理这个文件,然后在应用一些转换后将输出写入其他文件。我正在使用node进行此操作。
虽然这种方法可以,但是它需要很长时间才能完成。我能够在JAVA中在30-45分钟内完成此操作,但在node中,完成相同的工作需要超过160分钟。以下是代码:
以下是读取输入中每一行的初始化代码。
var path = '../10GB_input_file.txt';
var output_file = '../output.txt';

function fileopsmain(){

    fs.exists(output_file, function(exists){
        if(exists) {
            fs.unlink(output_file, function (err) {
                if (err) throw err;
                console.log('successfully deleted ' + output_file);
            });
        }
    });

    new lazy(fs.createReadStream(path, {bufferSize: 128 * 4096}))
        .lines
        .forEach(function(line){
            var line_arr = line.toString().split(';');
            perform_line_ops(line_arr, line_arr[6], line_arr[7], line_arr[10]);
        }
    );

}

这是一种执行某些操作并将输入传递给write方法写入输出文件的方法。
function perform_line_ops(line_arr, range_start, range_end, daynums){

    var _new_lines = '';
    for(var i=0; i<days; i++){
        //perform some operation to modify line pass it to print
    }

    write_line_ops(_new_lines);
}

以下方法用于将数据写入新文件。
function write_line_ops(line) {
    if(line != null && line != ''){
        fs.appendFileSync(output_file, line);
    }
}

我希望将这个时间缩短到15-20分钟。是否有可能做到呢?

还有,为了记录,我正在尝试在一台配备i7处理器8 GB内存的英特尔电脑上运行。


@Kevin B 我正在做同样的事情,我正在处理一个400MB的文件,处理时间大约是 ~2.5 分钟。虽然我不确定问题出在哪里。 - HVT7
我建议您首先限定问题范围。创建一个简单的测试应用程序,只需创建一个读取流并在不担心行和不写入磁盘的情况下读取整个文件。看看需要多长时间。如果很快,那么您可以逐步添加一个部分并跟踪进度。接下来将其管道传输到新文件名并查看性能。如果原始读取速度较慢,则问题出现在nodejs流中较低的位置,您将不得不更低级别地解决性能问题。 - jfriend00
@jfriend00 感谢您的建议,但我想寻找的是“我是否在这里使用了正确的方法”?因为如果不是,那么至少我可以被引导朝着正确的方向前进。 - HVT7
@jfriend00 好的 =)。那么,你建议使用哪些工具来执行这项任务? - HVT7
我建议你编写一个简单的测试应用程序,以查看纯Node.js流是否足够快。从方程式中删除所有其他变量(如“lazy”和行处理)。在大文件上运行一个简单的测试应用程序,只需使用流逐块读取它。我想我已经多次描述过这个过程。你必须进行一些测试,以确定哪种方法适合你。 - jfriend00
显示剩余5条评论
4个回答

6

你可以轻松地完成此操作,无需使用模块。例如:

var fs = require('fs');
var inspect = require('util').inspect;

var buffer = '';
var rs = fs.createReadStream('foo.log');
rs.on('data', function(chunk) {
  var lines = (buffer + chunk).split(/\r?\n/g);
  buffer = lines.pop();
  for (var i = 0; i < lines.length; ++i) {
    // do something with `lines[i]`
    console.log('found line: ' + inspect(lines[i]));
  }
});
rs.on('end', function() {
  // optionally process `buffer` here if you want to treat leftover data without
  // a newline as a "line"
  console.log('ended on non-empty buffer: ' + inspect(buffer));
});

你猜测了导致性能问题的原因,并提供了一种替代方案。但在进行一些测试之前,我们似乎还不知道性能问题出在哪里。 - jfriend00
@mscdex,我正在测试这段代码,所以现在可以将其保留为开放性问题。这个解决方案可能会有帮助。我一定会让大家知道上述方法的结果。 - HVT7
@mscdex,仍然需要相同的时间。我猜写入文件时需要一些延迟。 - HVT7
@HVT7 你使用的是哪个node/io.js版本?另外,你可以展示一下你用来处理日志的Java代码。 - mscdex
@mscdex JAVA代码很简单,使用I/O流,并且应用于转换的逻辑是相同的。其中没有使用第三方库。 - HVT7
显示剩余2条评论

0
执行速度慢是因为你没有使用Node的异步操作。实质上,你是这样执行代码的:
> read some lines
> transform
> write some lines
> repeat

虽然你可以同时做一切,或者至少阅读和写作。这里的一些答案就是这样做的,但是语法至少很复杂。使用scramjet,你可以用几行简单的代码来完成:

const {StringStream} = require('scramjet');

fs.createReadStream(path, {bufferSize: 128 * 4096})
    .pipe(new StringStream({maxParallel: 128})    // I assume this is an utf-8 file
    .split("\n")                                  // split per line
    .parse((line) => line.split(';'))             // parse line
    .map([line_arr, range_start, range_end, daynums] => {
        return simplyReturnYourResultForTheOtherFileHere(
            line_arr, range_start, range_end, daynums
        );                                         // run your code, return promise if you're doing some async work
    })
    .stringify((result) => result.toString())
    .pipe(fs.createWriteStream)
    .on("finish", () => console.log("done"))
    .on("error", (e) => console.log("error"))

这个应该会运行得更快。


0

我知道这已经过时了,但是...

猜测 appendFileSync() 向文件系统写入并等待响应。大量小的写操作通常很昂贵,如果你在Java中使用BufferedWriter,可以通过跳过一些write()来获得更快的结果。

使用其中一个异步写入方法,看看node是否合理地缓冲,或者将行写入大型node缓冲区,直到它被填满,并始终写入完整(或几乎完整)的缓冲区。通过调整缓冲区大小,您可以验证写入次数是否影响性能。我认为会有影响。


0

我猜不出你代码可能存在的瓶颈在哪里。

  • 你能添加lazy函数的库或源代码吗?
  • 你的perform_line_ops函数执行了多少操作?(if/else、switch/case、函数调用)

我已经根据你给出的代码创建了一个示例,虽然这并没有回答你的问题,但也许可以帮助你理解node如何处理这种情况。

const fs = require('fs')
const path = require('path')

const inputFile = path.resolve(__dirname, '../input_file.txt')
const outputFile = path.resolve(__dirname, '../output_file.txt')

function bootstrap() {
    // fs.exists is deprecated
    // check if output file exists
    // https://nodejs.org/api/fs.html#fs_fs_exists_path_callback
    fs.exists(outputFile, (exists) => {
        if (exists) {
            // output file exists, delete it
            // https://nodejs.org/api/fs.html#fs_fs_unlink_path_callback
            fs.unlink(outputFile, (err) => {
                if (err) {
                    throw err
                }

                console.info(`successfully deleted: ${outputFile}`)
                checkInputFile()
            })
        } else {
            // output file doesn't exist, move on
            checkInputFile()
        }
    })
}

function checkInputFile() {
    // check if input file can be read
    // https://nodejs.org/api/fs.html#fs_fs_access_path_mode_callback
    fs.access(inputFile, fs.constants.R_OK, (err) => {
        if (err) {
            // file can't be read, throw error
            throw err
        }

        // file can be read, move on
        loadInputFile()
    })
}

function saveToOutput() {
    // create write stream
    // https://nodejs.org/api/fs.html#fs_fs_createwritestream_path_options
    const stream = fs.createWriteStream(outputFile, {
        flags: 'w'
    })

    // return wrapper function which simply writes data into the stream
    return (data) => {
        // check if the stream is writable
        if (stream.writable) {
            if (data === null) {
                stream.end()
            } else if (data instanceof Array) {
                stream.write(data.join('\n'))
            } else {
                stream.write(data)
            }
        }
    }
}

function parseLine(line, respond) {
    respond([line])
}

function loadInputFile() {
    // create write stream
    const saveOutput = saveToOutput()
    // create read stream
    // https://nodejs.org/api/fs.html#fs_fs_createreadstream_path_options
    const stream = fs.createReadStream(inputFile, {
        autoClose: true,
        encoding: 'utf8',
        flags: 'r'
    })

    let buffer = null

    stream.on('data', (chunk) => {
        // append the buffer to the current chunk
        const lines = (buffer !== null)
            ? (buffer + chunk).split('\n')
            : chunk.split('\n')

        const lineLength = lines.length
        let lineIndex = -1

        // save last line for later (last line can be incomplete)
        buffer = lines[lineLength - 1]

        // loop trough all lines
        // but don't include the last line
        while (++lineIndex < lineLength - 1) {
            parseLine(lines[lineIndex], saveOutput)
        }
    })

    stream.on('end', () => {
        if (buffer !== null && buffer.length > 0) {
            // parse the last line
            parseLine(buffer, saveOutput)
        }

        // Passing null signals the end of the stream (EOF)
        saveOutput(null)
    })
}

// kick off the parsing process
bootstrap()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接