在NodeJS中从流缓冲数据以执行批量插入

4

如何在NodeJS中高效地进行缓冲,以批量插入事件流中的数据,而不是每次接收到一条记录就进行单独插入。以下是我心中的伪代码:

// Open MongoDB connection

mystream.on('data', (record) => {
   // bufferize data into an array
   // if the buffer is full (1000 records)
   // bulk insert into MongoDB and empty buffer
})

mystream.on('end', () => {
   // close connection
})

这看起来现实吗? 是否有任何可能的优化?现有的库是否能够提供帮助?


2
Node.js原生的stream API听起来非常适合,你应该考虑使用一个可写流(Writable)。可以通过设置highWaterMark来控制缓冲区的大小。可写流类有一个final()函数,当流完成时会被调用。这可以用于关闭数据库连接。 - jorgenkg
谢谢你的回答,我也考虑了那个选项,那可能是解决这个问题的最佳方式,你获取的数据越多,就会有更多的数据放入缓冲区,而你在缓冲区中接收到的数据将自动填充MongoDB数据库,我认为这样你也可以控制数据流,并使输入的数据自动销毁。我计划使用这种方法处理小到相当大的数据集(几KB到5-10GB的流数据)。 - dbrrt
1
MongoDB的原生驱动程序(以及Mongoose API)都公开了一个DB游标接口,可以封装为stream.Readable(stream.Readable.from()),然后将其传输到缓冲区Writable中。因此,脚本不会获取比可存储在其可写缓冲区中更多的数据。 - jorgenkg
这��例子非常接近我所寻找的 https://github.com/sorribas/mongo-write-stream/blob/master/index.js - dbrrt
2个回答

5
使用NodeJS的stream库,可以简洁高效地实现如下:
const stream = require('stream');
const util = require('util');
const mongo = require('mongo');

const streamSource; // A stream of objects from somewhere

// Establish DB connection
const client = new mongo.MongoClient("uri");
await client.connect();

// The specific collection to store our documents
const collection = client.db("my_db").collection("my_collection");

await util.promisify(stream.pipeline)( 
  streamSource, 
  stream.Writable({
    objectMode: true,
    highWaterMark: 1000,
    writev: async (chunks, next) => {
      try {
        const documents = chunks.map(({chunk}) => chunk);
        
        await collection.insertMany(docs, {ordered: false});

        next();
      }
      catch( error ){
        next( error );
      }
    }
  })
);

再次感谢您的帮助,这可能是我问题的解决方案。 - dbrrt
1
我在更复杂的使用情况下相对较新于Nodejs流。这个答案帮了我很多!非常感谢。 - victorkurauchi
1
需要注意的是,这种方法并不能保证 writev 总是会得到 1000 个块,它可能是介于 1 到 1000 之间的任何数字。 - morten.c
1
是的,最多1000个块/文档。这取决于在上一次调用“writev”和调用“next”之间向写缓冲区添加了多少对象。 - jorgenkg

0

最终我得到了一个没有依赖关系的解决方案。

const { MongoClient } = require("mongodb")
const url = process.env.MONGO_URI || "mongodb://localhost:27019";
const connection = MongoClient.connect(url, { useNewUrlParser: true, useUnifiedTopology: true })
    Promise.resolve(connection)
        .then((db) => {
            const dbName = "databaseName";
            const collection = 'collection';
            const dbo = db.db(dbName);

            let buffer = []

            stream.on("data", (row: any) => {
                buffer.push(row)
                if (buffer.length > 10000) {
                    dbo.collection(collection).insertMany(buffer, {ordered: false});
                    buffer = []
                }
            });

            stream.on("end", () => {
                // insert last chunk
                dbo.collection(collection).insertMany(buffer, {ordered: false})
                    .then(() => {
                        console.log("Done!");
                        db.close();
                    })
                
            });
            stream.on("error", (err) => console.log(err));

        })
        .catch((err) => {
            console.log(err)
        })

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接