使用Node.js编写大文件

39

我正在使用可写流在 Node.js 中编写一个大文件:

var fs     = require('fs');
var stream = fs.createWriteStream('someFile.txt', { flags : 'w' });

var lines;
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        stream.write( lines[i] );
    }
}

我想知道在不使用drain事件的情况下,这种方案是否安全?如果不安全(我认为是这种情况),那么写入任意大的数据到文件的模式是什么?


你可以使用 Promises 来排空。https://dev59.com/gFUL5IYBdhLWcg3wb3lz - Junior Usca
8个回答

29

这就是我最终做到的方法。其背后的想法是创建一个实现ReadStream接口的可读流,然后使用pipe()方法将数据传输到可写流。

var fs = require('fs');
var writeStream = fs.createWriteStream('someFile.txt', { flags : 'w' });
var readStream = new MyReadStream();

readStream.pipe(writeStream);
writeStream.on('close', function () {
    console.log('All done!');
});

可以从mongoose的QueryStream中获取MyReadStream类的示例。


24
当我们只想向文件中写入内容时,为什么需要使用ReadStream()函数? - krjampani
@nab 谢谢。当使用管道时,似乎没有添加\r\n进行换行,因此将每一行连接成一行... - loretoparisi
无法找到 QueryStream。 - Yaman KATBY
ReadStream接口链接已损坏。 - Peeyush Kushwaha

13

drain的想法是你可以在此处使用它进行测试:

var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});

var lines;
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        stream.write(lines[i]); //<-- the place to test
    }
}

你目前的实现方式不是可重入的,因此你需要重新设计架构来实现“可重入”。

var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});

var lines;
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        var written = stream.write(lines[i]); //<-- the place to test
        if (!written){
           //do something here to wait till you can safely write again
           //this means prepare a buffer and wait till you can come back to finish
           //  lines[i] -> remainder
        }
    }
}

然而,这是否意味着在等待时你需要继续缓冲 getLines 呢?

var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});

var lines,
    buffer = {
     remainingLines = []
    };
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        var written = stream.write(lines[i]); //<-- the place to test
        if (!written){
           //do something here to wait till you can safely write again
           //this means prepare a buffer and wait till you can come back to finish
           //  lines[i] -> remainder
           buffer.remainingLines = lines.slice(i);
           break;
           //notice there's no way to re-run this once we leave here.
        }
    }
}

stream.on('drain',function(){
  if (buffer.remainingLines.length){
    for (var i = 0; i < buffer.remainingLines.length; i++) {
      var written = stream.write(buffer.remainingLines[i]); //<-- the place to test
      if (!written){
       //do something here to wait till you can safely write again
       //this means prepare a buffer and wait till you can come back to finish
       //  lines[i] -> remainder
       buffer.remainingLines = lines.slice(i);
      }
    }
  }
});

3
不必使用自己的缓冲区,Node.js已经为您做好了。请阅读源文件nodejs-source/lib/fs.js#WriteStream.prototype.write。 - ayanamist

7

处理这个问题最干净的方法是将您的行生成器制作为可读流 - 让我们称其为lineReader。然后以下内容将自动为您处理缓冲区和排水:

lineReader.pipe(fs.createWriteStream('someFile.txt'));

如果您不想创建可读流,可以监听write的输出以了解缓冲区满度,并像这样做出响应:
var i = 0, n = lines.length;
function write () {
  if (i === n) return;  // A callback could go here to know when it's done.
  while (stream.write(lines[i++]) && i < n);
  stream.once('drain', write);
}
write();  // Initial call.

这种情况的更多示例可以在这里找到。


4

我发现使用流处理大文件的性能很差 - 这是因为您无法设置足够大的输入缓冲区(至少我不知道有什么好的方法可以这样做)。以下是我的解决方案:

var fs = require('fs');

var i = fs.openSync('input.txt', 'r');
var o = fs.openSync('output.txt', 'w');

var buf = new Buffer(1024 * 1024), len, prev = '';

while(len = fs.readSync(i, buf, 0, buf.length)) {

    var a = (prev + buf.toString('ascii', 0, len)).split('\n');
    prev = len === buf.length ? '\n' + a.splice(a.length - 1)[0] : '';

    var out = '';
    a.forEach(function(line) {

        if(!line)
            return;

        // do something with your line here

        out += line + '\n';
    });

    var bout = new Buffer(out, 'ascii');
    fs.writeSync(o, bout, 0, bout.length);
}

fs.closeSync(o);
fs.closeSync(i);

你有关于readStream/writeStreamreadSync/writeSync的基准测试吗?以确认这个答案。谢谢。 - loretoparisi
“bout”变量被定义为什么? - B''H Bi'ezras -- Boruch Hashem

3
这个问题的几个建议答案完全忽略了流的要点。
这个模块可以帮助https://www.npmjs.org/package/JSONStream 然而,假设情况如描述的那样,并自己编写代码。您正在使用默认为ObjectMode = true的MongoDB作为流进行读取。
如果您尝试直接流式传输到文件,则会出现问题,例如“无效的非字符串/缓冲区块”错误。
解决此类问题的方法非常简单。
只需在可读和可写之间放置另一个Transform,以适当地将对象可读性转换为字符串可写性即可。
示例代码解决方案:
var fs = require('fs'),
    writeStream = fs.createWriteStream('./out' + process.pid, {flags: 'w', encoding: 'utf-8' }),
    stream = require('stream'),
    stringifier = new stream.Transform();
stringifier._writableState.objectMode = true;
stringifier._transform = function (data, encoding, done) {
    this.push(JSON.stringify(data));
    this.push('\n');
    done();
}
rowFeedDao.getRowFeedsStream(merchantId, jobId)
.pipe(stringifier)
.pipe(writeStream).on('error', function (err) {
   // handle error condition
}

2

[编辑] 更新后的Node.js writable.write(...) API文档 表示:

返回值仅供参考。即使返回false,你也可以继续写入。但是,写操作将在内存中缓冲,因此最好不要过度使用。相反,在写入更多数据之前,请等待drain事件。

[原文] 来自stream.write(...)文档 (我强调的部分):

如果字符串已刷新到内核缓冲区,则返回true。如果内核缓冲区已满,并且数据将来会被发送,则返回false

我的理解是,如果给定的字符串立即写入底层操作系统缓冲区,则“write”函数返回true;如果尚未写入,但是将由write函数写入(例如,可能由WriteStream为您缓冲),则返回false,这样您就无需再次调用“write”。


1
但是,“在这种方式下编写文件描述符,在流排空之前关闭描述符会冒险发送无效(关闭的)FD。”让我想到缓冲区已满意味着它不能再接受你的任何代码。老实说,我不知道,只是尽力给出了我的最佳答案。 - jcolebrand
@jcolebrand:是的,我也不知道,但我猜“drain”事件只是表示操作系统已准备好立即写入,以防您真的想避免任何缓冲,无论是您自己的还是来自WriteStream“write”方法的缓冲。然而,“drain”的文档提到了“可以安全地再次写入”,这可能是措辞不当,也可能是反驳我的解释的证据! - maerics

1
如果您没有输入流,则无法轻松使用管道。上述方法都对我不起作用,排水事件也不会触发。根据Tyler的答案,解决方法如下:
var lines[]; // some very large array
var i = 0;

function write() {
    if (i < lines.length)  {
        wstream.write(lines[i]), function(err){
            if (err) {
                console.log(err);
            } else {
                i++;
                write();
            }
        });
    } else {
        wstream.end();
        console.log("done");
    }
};
write();

1
这里是一个在现代 Node.js 中使用生成器函数、Readable.frompipeline 的简单解决方案。无需直接与流进行交互。
import { createWriteStream } from "fs";
import { Readable } from 'stream';
import { pipeline } from 'stream/promises';

function* generate() {
  for (const line of lines) {
    yield line;
    yield '\n';
  }
}

await pipeline(
  Readable.from(generate()),
  createWriteStream("someFile.txt"),
);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接