在Node.js中暂停可读流

9
我正在使用csv-to-json,这是一个很好的处理CSV文件的库。
我有一个用例,需要处理一个大型(>200万行)的CSV文件并将其插入到数据库中。为了避免内存问题,我打算将CSV作为流进行处理,每10000行暂停一次流,将行插入到我的数据库中,然后恢复流。但由于某种原因,我似乎无法暂停流。
例如,以下代码:
const rs = fs.createReadStream("./foo.csv");
rs.pause();

let count = 0;

csv()
.fromStream(rs)
.on("json", (json) => {
  count++;
  console.log(count);
})
.on("done", () => {
  cb(null, count);
})
.on("error", (err) => {
  cb(err);
})

"count"被记录了200次(这是我CSV文件中的行数)- 我本来期望它不会记录任何东西,因为在将其传递给"fromStream()"之前流已经被暂停了。

你是按行插入数据库吗?为什么不创建一个队列并限制同时执行的请求,或者使用一些异步方法来防止内存泄漏和避免刷新请求呢? - Asif Saeed
2
@AsifSaeed,我只对暂停流或关于是否可行的信息感兴趣 - 无论如何还是谢谢。 - nicholaswmin
3个回答

6
以下是一个由该库的创建者提出的解决方案,可以在此问题中进行跟踪:Issue
var tmpArr=[];
rs.pipe(csv({},{objectMode:true})).pipe(new Writable({
  write: function(json, encoding,callback){
    tmpArr.push(json);
    if (tmpArr.length===10000){
      myDb.save(tmpArr,function(){
        tmpArr=[];
        callback();
      })
    }else{
      callback();
    }
  } ,
  objectMode:true
}))
.on('finish',function(){
  if (tmpArr.length>0){
    myDb.save(tmpArr,function(){
      tmpArr=[];
    })
  }
})

我实际上通过取消管道操作来模拟暂停,但这并不是理想的方法:
let count = 0;
var csvParser=csv()
.fromStream(rs)
.on("json", (json) => {
  rows.push(json);
  if (rows.length % 1000 === 0) {
    rs.unpipe();
    // clear `rows` right after `unpipe`
    const entries = rows;
    rows = [];
    this._insertEntries(db, entries, ()=> {
      rs.pipe(csvParser);
    });
  }
})

1
使用可写流来暂停它并在其中执行诸如数据库更新之类的操作是一个不错的主意。谢谢分享! - Johnny
嗨,我实现了第一段代码。它可以工作,问题是对于大文件(> 500,000行),节点会以某种方式启动一个新的相同进程与当前进程并行运行。为了清楚起见,我设置了一个“计数”变量来计算已读取多少行。终端打印出“计数”从100(我的间隔)开始,然后是200,然后是300等等。在达到约500,000行后,还有另一行也从100开始增加。第一个“计数”(现在> 500,000)仍然在增加。 - Tri Nguyen
@TriNguyen,请确保检查“maxRowLength”。刚刚运行了880K行,没有任何问题。 - KuN

3

我利用csvtojson还有fromString(...)方法的事实,并使用以下方法。

  1. 使用line-by-line包读取固定数量的行,即10000行,并将它们存储在一个数组中。
  2. 使用lr.pause()暂停line-by-line读取器。
  3. 在索引0处插入标题行(如果您的csv文件有标题行,则使用简单的条件语句忽略由line-by-line读取器返回的第一行)。
  4. 使用EOL字符连接所有行,这将为您提供该CSV文件的10000行的字符串表示形式。
  5. 使用csvtojson的.fromString(...)将块的字符串表示形式转换为JSON对象并将其插入到数据库中。
  6. 通过lr.resume()恢复流,并重复此过程,直到line-by-line读取器发出'end'事件。

以下是完整代码:

const CSVToJSON = require("csvtojson");
const LineByLineReader = require("line-by-line");
const { EOL } = require("os");

const BLOCK_LIMIT = 10000;

let lines = [];
let isFirstLineProcessed = false;

const lr = new LineByLineReader("./foo.csv");

lr
.on("line", (line) => {

    // remove this if statement if your CSV does not contain headers line
    if (!isFirstLineProcessed) {
        isFirstLineProcessed = true;
        return;
    }

    lines.push(line);

    if (lines.length === BLOCK_LIMIT) {
        lr.pause();

        // insert headers string ("field1, field2, ...") at index 0;
        lines.splice(0, 0, headers);

        // join all lines using newline operator ("\n") to form a valid csv string
        const csvBlockString = lines.join(EOL);
        const entries = [];

        lines = [];      

        csv()
            .fromString(csvBlockString)
            .on("json", (json) => {
                entries.push(json);
            })
            .on("done", () => {
                this._insertEntries(db, entries, ()=> {
                    lr.resume();
               });
            });
    }
})
.on("end", () => {
    console.log("done");
});

2
你必须修改csv2json库才能完成此操作。
首先请阅读以下链接: https://nodejs.org/dist/latest-v6.x/docs/api/stream.html#stream_three_states 当你使用rs.pause()时,流处于暂停模式。实际上,即使你不这样做,可读流也会以暂停模式启动。
流在以下3种情况下进入恢复模式:
- 有.on('data')事件监听器 - 有.pipe()方法连接 - 显式调用readable.resume()
在你的情况下,fromStream()方法已将pipe方法连接到你的可读流中,从而恢复了该流。
参考代码: https://github.com/Keyang/node-csvtojson/blob/master/libs/core/Converter.js#L378
Converter.prototype.fromStream=function(readStream,cb){
  if (cb && typeof cb ==="function"){
    this.wrapCallback(cb);
  }
  process.nextTick(function(){
    readStream.pipe(this);
  }.bind(this))
  return this;
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接