Node.js中如何在createReadStream内使用async/await?

8
我正在逐行读取CSV文件,并将其插入/更新到MongoDB中。预期输出应为: 1. console.log(row); 2. console.log(cursor); 3.console.log("stream");
但是实际输出却像这样: 1. console.log(row); console.log(row); console.log(row); console.log(row); console.log(row); ............ ............ 2. console.log(cursor); 3.console.log("stream");
请告诉我我错过了什么。
const csv = require('csv-parser');
const fs = require('fs');

var mongodb = require("mongodb");

var client = mongodb.MongoClient;
var url = "mongodb://localhost:27017/";
var collection;
client.connect(url,{ useUnifiedTopology: true }, function (err, client) {

  var db = client.db("UKCompanies");
  collection = db.collection("company");
  startRead();
});
var cursor={};

async function insertRec(row){
  console.log(row);
  cursor = await collection.update({CompanyNumber:23}, row, {upsert: true});
  if(cursor){
    console.log(cursor);
  }else{
    console.log('not exist')
  }
  console.log("stream");
}



async function startRead() {
  fs.createReadStream('./data/inside/6.csv')
    .pipe(csv())
    .on('data', async (row) => {
      await insertRec(row);
    })
    .on('end', () => {
      console.log('CSV file successfully processed');
    });
}
3个回答

22
在您的startRead()函数中,await insertRec()在处理insertRec()时不会停止更多的data事件流。因此,如果您不希望下一个data事件在insertRec()完成之前运行,您需要暂停并恢复数据流。
async function startRead() {
  const stream = fs.createReadStream('./data/inside/6.csv')
    .pipe(csv())
    .on('data', async (row) => {
      try {
        stream.pause();
        await insertRec(row);
      } finally {
        stream.resume();
      }
    })
    .on('end', () => {
      console.log('CSV file successfully processed');
    });
}

提醒一下,如果 insertRec() 失败,还需要进行一些错误处理。


感谢 @jfriend00 - Pritam Parua
2
流暂停/恢复非常有问题,很少起作用。 - Pian0_M4n
1
@Pian0_M4n那么正确的解决方案是什么呢? - Hung Tran
@HungTran 我还没有找到一个可靠的方法来进行带暂停的直播。无论我尝试什么,都没有成功。 - Pian0_M4n
@HungTran 我还没有找到一个可靠的方法来进行带暂停的流媒体。我尝试过的所有方法都没有成功。 - undefined

4

Node 10+开始,ReadableStream拥有属性Symbol.asyncIterator,可以使用for-await-of处理流。

async function startRead() {
    const readStream = fs.createReadStream('./data/inside/6.csv');    
    
    for await (const row of readStream.pipe(csv())) {
        await insertRec(row);
    }

    console.log('CSV file successfully processed');
}

2

这是在此情况下预期的行为,因为您的on数据侦听器会在流中有数据可用时异步触发insertRec。所以您插入方法的第一行代码被视为并行执行。如果您想控制此行为,可以在创建读取流时使用highWaterMark (https://nodejs.org/api/stream.html#stream_readable_readablehighwatermark)属性。这样,您将每次获取一个记录,但我不确定您的用例是什么。

类似于这样

fs.createReadStream(`somefile.csv`, {
  "highWaterMark": 1
})

此外,您还没有等待startRead方法。我建议将其包装在promise中,并在end监听器中解决它,否则您将不知道何时处理完成。示例如下:

function startRead() {
  return new Promise((resolve, reject) => {
    fs.createReadStream(`somepath`)
      .pipe(csv())
      .on("data", async row => {
        await insertRec(row);
      })
      .on("error", err => {
        reject(err);
      })
      .on("end", () => {
        console.log("CSV file successfully processed");
        resolve();
      });
  });

}

1
设置highWaterMark并不能让您限制data事件的速率。相反,OP应该实现一个可配置为逐个文档write或批量写入文档writev的流Writable。highWaterMark可以让您控制内存压力。 - jorgenkg
@jorgenkg 说得没错。谢谢你的澄清。 - Ashish Modi
@jorgenkg - "对于以对象模式操作的流,highWaterMark 指定了对象的总数" - https://nodejs.org/api/stream.html#stream_buffering - Ashish Modi
是的 - 这个数字表示在(读/写)流的内部缓冲区中将被缓冲的对象数量。对象将始终使用write一次处理一个。highWaterMark指示可以为流实例缓冲多少个对象。 - jorgenkg
在这个例子中,最后一个结果没有被等待,因为"end"事件将在"data"事件处理程序执行期间触发。如果进程现在退出,那么最后一条记录将会丢失。 - simbolo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接