使用mongoose在MongoDB中进行批量upsert

31

有没有在mongoose中执行批量upsert的选项?基本上是有一个数组,如果元素不存在,则插入该元素,如果存在,则更新它(我正在使用自定义_ids)

当我使用.insert时,MongoDB返回重复键错误E11000(应该更新),但是插入多个新文档确实可以正常工作:

var Users = self.db.collection('Users');

Users.insert(data, function(err){
            if (err) {
                callback(err);
            }
            else {
                callback(null);
            }
        });

使用.save会返回一个错误,指出参数必须是单个文档:

Users.save(data, function(err){
   ...
}
这个答案指出没有这样的选项,但它是针对C#特定的,而且已经3年了。所以我想知道是否有使用mongoose的任何选项来做到这一点?

谢谢!


什么是批量更新插入?如果设置了更新插入标志为true,则在没有找到要更新的文档时会创建一个新文档。http://docs.mongodb.org/manual/reference/glossary/#term-upsert - joao
@joao 可能在“批量”操作 API 中被提及,就像所给出的答案中提到的那样。 - Neil Lunn
7个回答

23
在写作时,"mongoose" 没有特别指定,或者至少还没有。实际上,自 MongoDB shell 2.6 发布以来,所有通用的辅助方法都使用了 "Bulk operations API",就像它是一个底层的东西一样。在它的实现中,它首先尝试这样做,如果检测到旧版本的服务器,则会 "回退" 到旧实现。
所有 mongoose 方法 "当前" 都使用 "传统" 实现或写入关注响应和基本的传统方法。但是,从任何给定的 mongoose 模型中都可以访问 .collection 访问器,该访问器基本上访问了 "集合对象",而该对象是由 mongoose 本身实现的 "node native driver" 的基础。
 var mongoose = require('mongoose'),
     Schema = mongoose.Schema;

 mongoose.connect('mongodb://localhost/test');

 var sampleSchema  = new Schema({},{ "strict": false });

 var Sample = mongoose.model( "Sample", sampleSchema, "sample" );

 mongoose.connection.on("open", function(err,conn) { 

    var bulk = Sample.collection.initializeOrderedBulkOp();
    var counter = 0;

    // representing a long loop
    for ( var x = 0; x < 100000; x++ ) {

        bulk.find(/* some search */).upsert().updateOne(
            /* update conditions */
        });
        counter++;

        if ( counter % 1000 == 0 )
            bulk.execute(function(err,result) {             
                bulk = Sample.collection.initializeOrderedBulkOp();
            });
    }

    if ( counter % 1000 != 0 )
        bulk.execute(function(err,result) {
           // maybe do something with result
        });

 });

“mongoose方法”实际上知道连接可能尚未建立,因此会“排队”直到完成连接。而您正在“深入挖掘”的本机驱动程序则没有进行这种区分。
因此,您必须确保以某种方式建立了连接。但是只要小心操作,就可以使用本机驱动程序的方法。

谢谢!这个很好用。我本来想用joao的方法,但是我没能够通过.update()上传多个文档... 当然,我可以用for循环来实现,但是我猜批量上传更有效率?或者因为数据库连接已经打开了,所以没有区别? - user3122267
@user3122267,Upsert和Bulk基本上是“天壤之别”,不同甚至没有接近。 "upsert"会在不存在文档的情况下创建一个新文档,而"Bulk"则是批量操作。另一个选项是"multi",因为.update()默认只修改找到的“第一个”文档。喜欢这种方法吗?看看那些一无所知的评论者和那些真正拥有知识的回答者之间的巨大差异? - Neil Lunn
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - Neil Lunn
我注意到这个答案和@konsumer的都是同步循环所有记录。我对在一个时钟周期内创建10个“bulk”操作与在10个单独的时钟周期内创建10个“bulk”操作(就Node中的内存使用而言)的性能差异很感兴趣。 - joeytwiddle
2
@joeytwiddle 只有在调用.execute()之后,“批量”操作才是异步的。这是因为向服务器来回传输数据会花费大量的IO,所以您要尽量减少此类操作。虽然在同步循环中可能会多次使用.execute()并使用多个连接,但您可以通过类似async.whilst或其他控件来改变它,其中迭代可以由回调来控制(因此在.execute()内部)以处理完成。这在Promises中有些困难,但仍然是可能的。 - Neil Lunn
我测试了这里提供的两种技术,并在使用 ulimit 的帮助下发现,如果 Node 创建文档的速度比 Mongo 存储它们(以及网络传输它们)的速度快,那么两者都可以填满 Node 的内存。因此,对于大量文档,我们应该限制一次执行的批量操作数量,通过等待完成来实现。同时运行 4 个并行操作可能是一个不错的方法。 - joeytwiddle

19

不需要像@neil-lunn建议的那样管理限制(1000)。 Mongoose已经处理了这个问题。 我以他的优秀答案为基础,完成了这个完整的基于Promise的实现和示例:

var Promise = require('bluebird');
var mongoose = require('mongoose');

var Show = mongoose.model('Show', {
  "id": Number,
  "title": String,
  "provider":  {'type':String, 'default':'eztv'}
});

/**
 * Atomic connect Promise - not sure if I need this, might be in mongoose already..
 * @return {Priomise}
 */
function connect(uri, options){
  return new Promise(function(resolve, reject){
    mongoose.connect(uri, options, function(err){
      if (err) return reject(err);
      resolve(mongoose.connection);
    });
  });
}

/**
 * Bulk-upsert an array of records
 * @param  {Array}    records  List of records to update
 * @param  {Model}    Model    Mongoose model to update
 * @param  {Object}   match    Database field to match
 * @return {Promise}  always resolves a BulkWriteResult
 */
function save(records, Model, match){
  match = match || 'id';
  return new Promise(function(resolve, reject){
    var bulk = Model.collection.initializeUnorderedBulkOp();
    records.forEach(function(record){
      var query = {};
      query[match] = record[match];
      bulk.find(query).upsert().updateOne( record );
    });
    bulk.execute(function(err, bulkres){
        if (err) return reject(err);
        resolve(bulkres);
    });
  });
}

/**
 * Map function for EZTV-to-Show
 * @param  {Object} show EZTV show
 * @return {Object}      Mongoose Show object
 */
function mapEZ(show){
  return {
    title: show.title,
    id: Number(show.id),
    provider: 'eztv'
  };
}

// if you are  not using EZTV, put shows in here
var shows = []; // giant array of {id: X, title: "X"}

// var eztv = require('eztv');
// eztv.getShows({}, function(err, shows){
//   if(err) return console.log('EZ Error:', err);

//   var shows = shows.map(mapEZ);
  console.log('found', shows.length, 'shows.');
  connect('mongodb://localhost/tv', {}).then(function(db){
    save(shows, Show).then(function(bulkRes){
      console.log('Bulk complete.', bulkRes);
      db.close();
    }, function(err){
        console.log('Bulk Error:', err);
        db.close();
    });
  }, function(err){
    console.log('DB Error:', err);
  });

// });

这样做的好处是在完成后关闭连接,如果你关心的话,显示任何错误,但如果不关心则忽略它们(Promises中的错误回调是可选的)。而且它也非常快。我在这里分享我的发现。如果您想将所有eztv节目保存到数据库中,可以取消注释eztv相关内容,作为示例。


1
这样做会不会消耗更多的内存? - ECMAScript
不,这就是我的观点。Mongoose已经管理了批量操作计数。你不需要限制到1000,因为它已经限制到你的MongoDB设置的限制。 - konsumer
1
没错,这就是 bulk.execute 的作用。https://docs.mongodb.org/v3.0/reference/method/db.collection.initializeUnorderedBulkOp/ - konsumer
2
@ECMAScript 实际上,Neil 和 konsumer 的建议都会消耗相似数量的 Node 内存,因为两种技术都会在等待 Mongo 响应时不断创建文档。显然,只有当您打算插入的文档数量超过 RAM 容量时,这才是一个问题。 - joeytwiddle
1
@PirateApp 也许你的内存不足以容纳这个结构?你得到了什么错误?如果你没有足够的内存来容纳它,你可能需要使用串行承诺一个接一个地运行它们,或者批量运行它们的块。 - konsumer
显示剩余11条评论

7
await Model.bulkWrite(docs.map(doc => ({
    updateOne: {
        filter: {id: doc.id},
        update: doc,
        upsert: true
    }
})))


或者更加详细:

const bulkOps = docs.map(doc => ({
    updateOne: {
        filter: {id: doc.id},
        update: doc,
        upsert: true
    }
}))

Model.bulkWrite(bulkOps)
        .then(bulkWriteOpResult => console.log('BULK update OK:', bulkWriteOpResult))
        .catch(err => console.error('BULK update error:', err))

https://dev59.com/I1kS5IYBdhLWcg3wXFg9#60330161


5
我发布了一个用于Mongoose的插件,它公开了一个静态的upsertMany方法,以执行批量upsert操作并提供了Promise接口。
使用此插件而不是在基础集合上初始化自己的批量操作的一个附加好处是,此插件将您的数据转换为Mongoose模型的格式,然后再将其转换回普通对象进行upsert。这确保了应用Mongoose模式验证,并使数据脱敏并适合原始插入。
链接如下: https://github.com/meanie/mongoose-upsert-many https://www.npmjs.com/package/@meanie/mongoose-upsert-many 希望对你有所帮助!

1

如果您在db.collection中没有看到bulk方法,即出现错误如xxx变量没有方法:initializeOrderedBulkOp()

尝试更新您的mongoose版本。 显然,旧的mongoose版本不能传递所有基础的mongo db.collection方法。

npm install mongoose

对我来说解决了这个问题。


0

你可以使用mongoose的Model.bulkWrite()

const res = await Character.bulkWrite([
  {
    updateOne: {
      filter: { name: 'Will Riker' },
      update: { age: 29 },
      upsert: true
    }
  },
  {
    updateOne: {
      filter: { name: 'Geordi La Forge' },
      update: { age: 29 },
      upsert: true
    }
  }
]);

参考资料:https://masteringjs.io/tutorials/mongoose/upsert


0

最近我在我的电子商务应用程序中存储产品时不得不实现这一点。由于我每4个小时必须插入10000个项目,所以我的数据库经常超时。对我来说,一种选择是在连接到数据库时在mongoose中设置socketTimeoutMS和connectTimeoutMS,但这种方法感觉有些hacky,而且我不想操纵数据库的连接超时默认值。我还看到@neil lunn的解决方案采用了一个简单的同步方法,在for循环内部进行取模运算。这里是我的异步版本,我相信它可以更好地完成工作。

let BATCH_SIZE = 500
Array.prototype.chunk = function (groupsize) {
    var sets = [];
    var chunks = this.length / groupsize;

    for (var i = 0, j = 0; i < chunks; i++ , j += groupsize) {
        sets[i] = this.slice(j, j + groupsize);
    }

    return sets;
}

function upsertDiscountedProducts(products) {

    //Take the input array of products and divide it into chunks of BATCH_SIZE

    let chunks = products.chunk(BATCH_SIZE), current = 0

    console.log('Number of chunks ', chunks.length)

    let bulk = models.Product.collection.initializeUnorderedBulkOp();

    //Get the current time as timestamp
    let timestamp = new Date(),

        //Keep track of the number of items being looped
        pendingCount = 0,
        inserted = 0,
        upserted = 0,
        matched = 0,
        modified = 0,
        removed = 0,

        //If atleast one upsert was performed
        upsertHappened = false;

    //Call the load function to get started
    load()
    function load() {

        //If we have a chunk to process
        if (current < chunks.length) {
            console.log('Current value ', current)

            for (let i = 0; i < chunks[current].length; i++) {
                //For each item set the updated timestamp to the current time
                let item = chunks[current][i]

                //Set the updated timestamp on each item
                item.updatedAt = timestamp;

                bulk.find({ _id: item._id })
                    .upsert()
                    .updateOne({
                        "$set": item,

                        //If the item is being newly inserted, set a created timestamp on it
                        "$setOnInsert": {
                            "createdAt": timestamp
                        }
                    })
            }

            //Execute the bulk operation for the current chunk
            bulk.execute((error, result) => {
                if (error) {
                    console.error('Error while inserting products' + JSON.stringify(error))
                    next()
                }
                else {

                    //Atleast one upsert has happened
                    upsertHappened = true;
                    inserted += result.nInserted
                    upserted += result.nUpserted
                    matched += result.nMatched
                    modified += result.nModified
                    removed += result.nRemoved

                    //Move to the next chunk
                    next()
                }
            })



        }
        else {
            console.log("Calling finish")
            finish()
        }

    }

    function next() {
        current++;

        //Reassign bulk to a new object and call load once again on the new object after incrementing chunk
        bulk = models.Product.collection.initializeUnorderedBulkOp();
        setTimeout(load, 0)
    }

    function finish() {

        console.log('Inserted ', inserted + ' Upserted ', upserted, ' Matched ', matched, ' Modified ', modified, ' Removed ', removed)

        //If atleast one chunk was inserted, remove all items with a 0% discount or not updated in the latest upsert
        if (upsertHappened) {
            console.log("Calling remove")
            remove()
        }


    }

    /**
     * Remove all the items that were not updated in the recent upsert or those items with a discount of 0
     */
    function remove() {

        models.Product.remove(
            {
                "$or":
                [{
                    "updatedAt": { "$lt": timestamp }
                },
                {
                    "discount": { "$eq": 0 }
                }]
            }, (error, obj) => {
                if (error) {
                    console.log('Error while removing', JSON.stringify(error))
                }
                else {
                    if (obj.result.n === 0) {
                        console.log('Nothing was removed')
                    } else {
                        console.log('Removed ' + obj.result.n + ' documents')
                    }
                }
            }
        )
    }
}

@neil-lunn,如果我没记错的话,你的解决方案会创建多个批量对象,并且它们都会异步执行,但是在我的解决方案中,我使其每次只有一个bulk.execute。 - PirateApp
1
据我理解,您正在串行处理批次。我认为这是正确的,以确保内存不会超载。但是一次只有一个批次,有时您的数据库将等待网络,有时网络将等待CPU。并行运行5-10个较小的批次(每当早期批次完成时,串行启动新的批次)可能会略微增加吞吐量,通过确保系统的所有可以工作的部分都在工作。 - joeytwiddle

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接