我需要构建一个用于处理大型CSV文件以在bluebird.map()调用中使用的函数。考虑到文件的潜在大小,我希望使用流式处理。
该函数应接受一个流(CSV文件)和一个函数(用于处理来自流的块),并在文件读取结束时返回一个Promise(已解决)或错误(被拒绝)。
因此,我从以下内容开始:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
// use readable or data event?
parser.on('readable', function() {
// call processor, which may be async
// how do I throttle the amount of promises generated
});
var db = pgp(api.config.mailroom.fileMakerDbConfig);
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
现在,我有两个相关的问题:
- 我需要限制实际处理的数据量,以避免创建内存压力。
- 作为
processor
参数传递的函数通常是异步的,例如通过基于promise的库(当前为:pg-promise
)将文件内容保存到数据库中。因此,它会在内存中创建一个promise并继续移动。
pg-promise
库有管理此操作的函数,如page(),但我无法理解如何将流事件处理程序与这些promise方法混合使用。目前,我在每个read()
之后的readable
部分的处理程序中返回一个promise,这意味着我创建了大量承诺的数据库操作,并最终失效,因为我达到了进程内存限制。
有人有可用作起点的工作示例吗?
更新: 可能有不止一种方法来完成此任务,但是以下方法可以运行:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
// some checks trimmed out for example
var db = pgp(api.config.mailroom.fileMakerDbConfig);
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
var readDataFromStream = function(index, data, delay) {
var records = [];
var record;
do {
record = parser.read();
if(record != null)
records.push(record);
} while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
parser.pause();
if(records.length)
return records;
};
var processData = function(index, data, delay) {
console.log('processData(' + index + ') > data: ', data);
parser.resume();
};
parser.on('readable', function() {
db.task(function(tsk) {
this.page(readDataFromStream, processData);
});
});
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
有人认为这种方法存在潜在的问题吗?
pg-promise
最近添加的page
不是白费功夫 ;) - vitaly-treturn undefined
,因为如果你什么都不返回的话,它就会自动返回undefined ;) - vitaly-ttask()
上使用catch()
并返回this.page()
吗? - alphadogg