以流方式读取CSV文件并将其存储到数据库中

4
我有一些巨大的csv文件,需要将它们存储到Mongo数据库中。因为这些文件太大了,所以我需要使用流。在数据写入数据库时,我会暂停流。
var fs = require('fs');
var csv = require('csv');
var mongo = require('mongodb');

var db = mongo.MongoClient.connect...

var readStream = fs.createReadStream('hugefile.csv');
readStream.on('data', function(data) {
  readStream.pause();
  csv.parse(data.toString(), { delimiter: ','}, function(err, output) {
    db.collection(coll).insert(data, function(err) {
      readStream.resume();
    });
  });
});
readStream.on('end', function() {
  logger.info('file stored');
});

但是csv.parse出现了错误,因为我需要逐行读取文件以将其处理为csv,并将其转换为mongodb的json格式。也许我不应该暂停它们,而是使用一个接口。我还没有找到任何解决方案。任何帮助都将不胜感激!
2个回答

4
我认为您可能想从原始数据流中创建一系列行。
这是来自split软件包的一个示例。 https://www.npmjs.com/package/split
fs.createReadStream(file)
.pipe(split())
.on('data', function (line) {
  //each chunk now is a seperate line! 
})

适用于您的示例,它可能如下所示:

var readStream = fs.createReadStream('hugefile.csv');
var lineStream = readStream.pipe(split());
lineStream.on('data', function(data) {
    //remaining code unmodified

-1

我不确定bulk()在2015年是否存在,但任何试图从大型源导入项目的人都应考虑使用它们。

var fs = require('fs');
var csv = require('fast-csv');
var mongoose = require('mongoose');

var db = mongoose.connect...

var counter = 0;        // to keep count of values in the bulk()
const BULK_SIZE = 1000;
var bulkItem = Item.collection.initializeUnorderedBulkOp();

var readStream = fs.createReadStream('hugefile.csv');
const csvStream = csv.fromStream(readStream, { headers: true });
csvStream.on('data', data => {
    counter++;
    bulkOrder.insert(order);

    if (counter === BATCH_SIZE) {
      csvStream.pause();
      bulkOrder.execute((err, result) => {
        if (err) console.log(err);
        counter = 0;
        bulkItem = Item.collection.initializeUnorderedBulkOp();
        csvStream.resume();
      });
    }
  }
});

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接