NodeJS,promises,streams - 处理大型CSV文件

12

我需要构建一个用于处理大型CSV文件以在bluebird.map()调用中使用的函数。考虑到文件的潜在大小,我希望使用流式处理。

该函数应接受一个流(CSV文件)和一个函数(用于处理来自流的块),并在文件读取结束时返回一个Promise(已解决)或错误(被拒绝)。

因此,我从以下内容开始:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}

现在,我有两个相关的问题:

  1. 我需要限制实际处理的数据量,以避免创建内存压力。
  2. 作为processor参数传递的函数通常是异步的,例如通过基于promise的库(当前为:pg-promise)将文件内容保存到数据库中。因此,它会在内存中创建一个promise并继续移动。

pg-promise库有管理此操作的函数,如page(),但我无法理解如何将流事件处理程序与这些promise方法混合使用。目前,我在每个read()之后的readable部分的处理程序中返回一个promise,这意味着我创建了大量承诺的数据库操作,并最终失效,因为我达到了进程内存限制。

有人有可用作起点的工作示例吗?

更新: 可能有不止一种方法来完成此任务,但是以下方法可以运行:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}

有人认为这种方法存在潜在的问题吗?


看起来很整洁,如果这个能够工作,那么干得好!我很高兴pg-promise最近添加的page不是白费功夫 ;) - vitaly-t
最后在readDataFromStream函数中简化一下就好了 ;) 你不需要return undefined,因为如果你什么都不返回的话,它就会自动返回undefined ;) - vitaly-t
实际上,这可能存在问题...当您调用db.task时,您没有处理其结果,因此如果它被拒绝,承诺库将抛出一个错误,指出您的拒绝未被处理。 - vitaly-t
我应该在task()上使用catch()并返回this.page()吗? - alphadogg
我已经更新了我的答案 - 它给出了解决您问题的整个思路。 - vitaly-t
4个回答

8

您可能会想要查看promise-streams

var ps = require('promise-streams');
passedStream
  .pipe(csv.parse({trim: true}))
  .pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row)))
  .wait().then(_ => {
    console.log("All done!");
  });

支持背压和一切操作。


5
以下是一个完整的应用程序,它正确地执行了与您想要的相同类型的任务:它将文件作为流读取,将其解析为CSV,并将每一行插入数据库。
const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});

const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');

const db = pgp(cn);

function receiver(_, data) {
    function source(index) {
        if (index < data.length) {
            // here we insert just the first column value that contains a prime number;
            return this.none('insert into primes values($1)', data[index][0]);
        }
    }

    return this.sequence(source);
}

db.task(t => {
    return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
    .then(data => {
        console.log('DATA:', data);
    }
    .catch(error => {
        console.log('ERROR:', error);
    });

请注意,我所做的唯一更改是使用库csv-parse而不是csv,作为更好的选择。
添加了方法stream.read的使用,该方法来自spex库,可以正确地为与promises一起使用的Readable流提供服务。

这样做会尝试在query("INSERT…")完成后立即从parser中读取下一个项目,而不管下一个项目是否已经可读吗?或者parser.read()返回一个Promise? - Bergi
此外,OP 寻找的返回 Promise 的 processor 回调函数发生了什么? - Bergi
@Bergi 我的理解是parser.read()是同步的,就像它展示的那样。如果它不是同步的,那么显然需要将其包装成一个promise。而且readable只会被触发一次,而不是每次读取操作都会触发,这是我的理解。至于返回promise的处理器,他只是在寻找数据处理完成时的resolve和失败时的reject,而我的例子提供了这个功能,即任务将相应地解决/拒绝。 - vitaly-t
是的,我自己也不太确定关于流部分的内容,而且我写的示例是基于问题提供的代码。如果那个代码有误,那么我的代码也会有误。但是,它仍然展示了一般的处理方法。 - vitaly-t
@vitaly-t:哦,还有,在早些评论中,您说“readable”事件只会触发一次。但实际上,它可以被触发多次。这会影响您的代码吗? - alphadogg
显示剩余8条评论

2

我发现了一种略微更好的方法来做同样的事情,这个方法具有更多的控制性。这是一个最小化的框架,具有精确的并行控制。当并行值为1时,所有记录都按顺序处理,而不需要将整个文件存储在内存中,我们可以增加并行值以实现更快的处理速度。

      const csv = require('csv');
      const csvParser = require('csv-parser')
      const fs = require('fs');

      const readStream = fs.createReadStream('IN');
      const writeStream = fs.createWriteStream('OUT');

      const transform = csv.transform({ parallel: 1 }, (record, done) => {
                                           asyncTask(...) // return Promise
                                           .then(result => {
                                             // ... do something when success
                                             return done(null, record);
                                           }, (err) => {
                                             // ... do something when error
                                             return done(null, record);
                                           })
                                       }
                                     );

      readStream
      .pipe(csvParser())
      .pipe(transform)
      .pipe(csv.stringify())
      .pipe(writeStream);

这允许对每个记录执行异步任务。

为了返回一个 Promise,我们可以返回一个空 Promise,并在流结束时完成它。

    .on('end',function() {
      //do something wiht csvData
      console.log(csvData);
    });

1

好的,我考虑在函数中使用async.queue,返回一个最终完成文件(或未完成)的承诺。然而,我想知道如何将像Bluebird这样的承诺库与基于流的大文件处理相结合。 ('pg-promise包括spex`,提供更高级别的承诺函数) - alphadogg

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接