这就是我最终做到的方法。其背后的想法是创建一个实现ReadStream接口的可读流,然后使用pipe()
方法将数据传输到可写流。
var fs = require('fs');
var writeStream = fs.createWriteStream('someFile.txt', { flags : 'w' });
var readStream = new MyReadStream();
readStream.pipe(writeStream);
writeStream.on('close', function () {
console.log('All done!');
});
可以从mongoose的QueryStream中获取MyReadStream
类的示例。
\r\n
进行换行,因此将每一行连接成一行... - loretoparisidrain的想法是你可以在此处使用它进行测试:
var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});
var lines;
while (lines = getLines()) {
for (var i = 0; i < lines.length; i++) {
stream.write(lines[i]); //<-- the place to test
}
}
你目前的实现方式不是可重入的,因此你需要重新设计架构来实现“可重入”。
var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});
var lines;
while (lines = getLines()) {
for (var i = 0; i < lines.length; i++) {
var written = stream.write(lines[i]); //<-- the place to test
if (!written){
//do something here to wait till you can safely write again
//this means prepare a buffer and wait till you can come back to finish
// lines[i] -> remainder
}
}
}
然而,这是否意味着在等待时你需要继续缓冲 getLines 呢?
var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});
var lines,
buffer = {
remainingLines = []
};
while (lines = getLines()) {
for (var i = 0; i < lines.length; i++) {
var written = stream.write(lines[i]); //<-- the place to test
if (!written){
//do something here to wait till you can safely write again
//this means prepare a buffer and wait till you can come back to finish
// lines[i] -> remainder
buffer.remainingLines = lines.slice(i);
break;
//notice there's no way to re-run this once we leave here.
}
}
}
stream.on('drain',function(){
if (buffer.remainingLines.length){
for (var i = 0; i < buffer.remainingLines.length; i++) {
var written = stream.write(buffer.remainingLines[i]); //<-- the place to test
if (!written){
//do something here to wait till you can safely write again
//this means prepare a buffer and wait till you can come back to finish
// lines[i] -> remainder
buffer.remainingLines = lines.slice(i);
}
}
}
});
处理这个问题最干净的方法是将您的行生成器制作为可读流 - 让我们称其为lineReader
。然后以下内容将自动为您处理缓冲区和排水:
lineReader.pipe(fs.createWriteStream('someFile.txt'));
write
的输出以了解缓冲区满度,并像这样做出响应:var i = 0, n = lines.length;
function write () {
if (i === n) return; // A callback could go here to know when it's done.
while (stream.write(lines[i++]) && i < n);
stream.once('drain', write);
}
write(); // Initial call.
这种情况的更多示例可以在这里找到。
我发现使用流处理大文件的性能很差 - 这是因为您无法设置足够大的输入缓冲区(至少我不知道有什么好的方法可以这样做)。以下是我的解决方案:
var fs = require('fs');
var i = fs.openSync('input.txt', 'r');
var o = fs.openSync('output.txt', 'w');
var buf = new Buffer(1024 * 1024), len, prev = '';
while(len = fs.readSync(i, buf, 0, buf.length)) {
var a = (prev + buf.toString('ascii', 0, len)).split('\n');
prev = len === buf.length ? '\n' + a.splice(a.length - 1)[0] : '';
var out = '';
a.forEach(function(line) {
if(!line)
return;
// do something with your line here
out += line + '\n';
});
var bout = new Buffer(out, 'ascii');
fs.writeSync(o, bout, 0, bout.length);
}
fs.closeSync(o);
fs.closeSync(i);
readStream/writeStream
和readSync/writeSync
的基准测试吗?以确认这个答案。谢谢。 - loretoparisivar fs = require('fs'),
writeStream = fs.createWriteStream('./out' + process.pid, {flags: 'w', encoding: 'utf-8' }),
stream = require('stream'),
stringifier = new stream.Transform();
stringifier._writableState.objectMode = true;
stringifier._transform = function (data, encoding, done) {
this.push(JSON.stringify(data));
this.push('\n');
done();
}
rowFeedDao.getRowFeedsStream(merchantId, jobId)
.pipe(stringifier)
.pipe(writeStream).on('error', function (err) {
// handle error condition
}
[编辑] 更新后的Node.js writable.write(...)
API文档 表示:
返回值仅供参考。即使返回false,你也可以继续写入。但是,写操作将在内存中缓冲,因此最好不要过度使用。相反,在写入更多数据之前,请等待drain事件。
[原文] 来自stream.write(...)
文档 (我强调的部分):
如果字符串已刷新到内核缓冲区,则返回
true
。如果内核缓冲区已满,并且数据将来会被发送,则返回false
。
我的理解是,如果给定的字符串立即写入底层操作系统缓冲区,则“write”函数返回true
;如果尚未写入,但是将由write函数写入(例如,可能由WriteStream为您缓冲),则返回false
,这样您就无需再次调用“write”。
var lines[]; // some very large array
var i = 0;
function write() {
if (i < lines.length) {
wstream.write(lines[i]), function(err){
if (err) {
console.log(err);
} else {
i++;
write();
}
});
} else {
wstream.end();
console.log("done");
}
};
write();
Readable.from
和 pipeline
的简单解决方案。无需直接与流进行交互。import { createWriteStream } from "fs";
import { Readable } from 'stream';
import { pipeline } from 'stream/promises';
function* generate() {
for (const line of lines) {
yield line;
yield '\n';
}
}
await pipeline(
Readable.from(generate()),
createWriteStream("someFile.txt"),
);