MongoDB将文档从一个集合移动到另一个集合

70
如何在MongoDB中将文档从一个集合移动到另一个集合?例如:我有很多位于集合A中的文档,我想将所有1个月前的文档移动到集合B中(这些1个月前的文档不应该在集合A中)。
使用聚合可以进行复制。但我想要做的是移动文档。 有什么方法可以用来移动文档?
15个回答

118

@markus-w-mahlberg 展示的批量操作(@mark-mullin 进行了优化)虽然高效,但写法不安全。如果批量插入失败,批量删除仍将继续。为了确保在移动记录时不会丢失任何记录,请改用以下方式:

function insertBatch(collection, documents) {
  var bulkInsert = collection.initializeUnorderedBulkOp();
  var insertedIds = [];
  var id;
  documents.forEach(function(doc) {
    id = doc._id;
    // Insert without raising an error for duplicates
    bulkInsert.find({_id: id}).upsert().replaceOne(doc);
    insertedIds.push(id);
  });
  bulkInsert.execute();
  return insertedIds;
}

function deleteBatch(collection, documents) {
  var bulkRemove = collection.initializeUnorderedBulkOp();
  documents.forEach(function(doc) {
    bulkRemove.find({_id: doc._id}).removeOne();
  });
  bulkRemove.execute();
}

function moveDocuments(sourceCollection, targetCollection, filter, batchSize) {
  print("Moving " + sourceCollection.find(filter).count() + " documents from " + sourceCollection + " to " + targetCollection);
  var count;
  while ((count = sourceCollection.find(filter).count()) > 0) {
    print(count + " documents remaining");
    sourceDocs = sourceCollection.find(filter).limit(batchSize);
    idsOfCopiedDocs = insertBatch(targetCollection, sourceDocs);

    targetDocs = targetCollection.find({_id: {$in: idsOfCopiedDocs}});
    deleteBatch(sourceCollection, targetDocs);
  }
  print("Done!")
}


该过程将id保留在目标和源中。 (我的情况与帖子有些不同,我偶然发现了这个功能。) - A. Rabus
异步更新在此发布:https://dev59.com/el8d5IYBdhLWcg3wNQRg#54715607 - Matt Wills
5
您能给出运行 moveDocuments 函数的示例吗? - perezmlal
1
这是Mongosh吗?非常酷的解决方案。现在的任务是在pymongo中重写它。 - Ben Fourie
moveDocuments(db.collectionA, db.collectionB, {'some.nested.param': 'value'}, 100); - kopos
显示剩余3条评论

57

更新 2

请不要再为此答案点赞。如@jasongarber的答案所述,他的回答在各个方面都更好。

更新

这个由@jasongarber提供的答案是更安全的方法,应该代替我的方法使用。


如果我理解正确您想移动所有超过1个月的文档,且使用mongoDB 2.6,则没有理由不使用bulk操作,这是我知道的最有效的执行多个操作的方式:

> var bulkInsert = db.target.initializeUnorderedBulkOp()
> var bulkRemove = db.source.initializeUnorderedBulkOp()
> var date = new Date()
> date.setMonth(date.getMonth() -1)
> db.source.find({"yourDateField":{$lt: date}}).forEach(
    function(doc){
      bulkInsert.insert(doc);
      bulkRemove.find({_id:doc._id}).removeOne();
    }
  )
> bulkInsert.execute()
> bulkRemove.execute()

这应该相当快,并且具有这样的优势,即在批量插入过程中出现问题时,原始数据仍然存在。


编辑

为了防止使用过多内存,您可以在处理每个 x 个文档时执行批量操作:

> var bulkInsert = db.target.initializeUnorderedBulkOp()
> var bulkRemove = db.source.initializeUnorderedBulkOp()
> var x = 10000
> var counter = 0
> var date = new Date()
> date.setMonth(date.getMonth() -1)
> db.source.find({"yourDateField":{$lt: date}}).forEach(
    function(doc){
      bulkInsert.insert(doc);
      bulkRemove.find({_id:doc._id}).removeOne();
      counter ++
      if( counter % x == 0){
        bulkInsert.execute()
        bulkRemove.execute()
        bulkInsert = db.target.initializeUnorderedBulkOp()
        bulkRemove = db.source.initializeUnorderedBulkOp()
      }
    }
  )
> bulkInsert.execute()
> bulkRemove.execute()

或者在 UI 工具如 Robomongo 中: db.getCollection('source').find({}).forEach(function(doc) { db.getCollection('target').insert(doc); db.getCollection('source').remove(doc);}) - Arthur
3
@Arthur:您的方法有两个主要缺点。它非常慢,而且在最糟糕的情况下,您可能会有不完整的收集内容难以再次同步。 - Markus W Mahlberg
这对我没有效果。我尝试在一个拥有5000万条记录的集合上进行操作,试图将其中约2500万条记录移出。但是,在查找查询时出现了错误致命错误:CALL_AND_RETRY_2#分配失败-内存不足。这台服务器拥有32GB的内存,而这些记录只有5个字段。整个集合的数据总大小只有约5GB。 - UpTheCreek
你的意思是在查找查询中设置限制吗?我甚至还没有执行任何操作,内存溢出错误发生在foreach期间。如果批量功能只能处理10k块,那么它们实际上并不适合此目的 :/(抱歉,我不是在向你发泄情绪,我很感激你的帮助,只是有点沮丧!) - UpTheCreek
@UpTheCreek 请看编辑。我只是想排除我们有内存问题的可能性。因为听起来像是这个问题。可能是由操作系统造成的。 - Markus W Mahlberg
显示剩余2条评论

23

插入和删除:

var documentsToMove = db.collectionA.find({});
documentsToMove.forEach(function(doc) {
    db.collectionB.insert(doc);
    db.collectionA.remove(doc);
});

注意:对于大型集合或包含大型文档的集合,此方法可能会相当缓慢。


insert()和remove()是优化的解决方案吗? - manojpt
没有想法 :) 您也可以使用管理工具进行转储和恢复。 - user1907906
3
这不是原子操作,存在将某些内容插入到集合B中但未从A中删除的潜在可能性。 - user1965449
5
最后一行应该是});,而不仅仅是}。缺少闭合括号。 - Jabba

13

$out 是用于创建带有数据的新集合的,因此使用 $out。

db.oldCollection.aggregate([{$out : "newCollection"}])

然后使用 drop

db.oldCollection.drop()

3
请注意,如果已经存在该名称的集合,则此操作将覆盖整个集合(而不是将匹配的文档附加到旧集合中)。 - Josip Filipović

5

您可以使用范围查询从sourceCollection获取数据并将游标数据保存在变量中,然后循环遍历并插入到目标集合:

 var doc = db.sourceCollection.find({
        "Timestamp":{
              $gte:ISODate("2014-09-01T00:00:00Z"),
              $lt:ISODate("2014-10-01T00:00:00Z")
        }
 });

 doc.forEach(function(doc){
    db.targetCollection.insert(doc);
 })

希望我的翻译能对您有所帮助!

是的!!insert()和remove()是优化解决方案吗? - manojpt
这可能会对你有所帮助:https://dev59.com/cG025IYBdhLWcg3wjGpO - Ninad

5

第一种方法(使用Mongo Dump)

1.从集合获取转储

mongodump -d 数据库名 -c 源集合名

2.恢复到集合

mongorestore -d 数据库名 -c 目标集合名 dir=dump/数据库名/源集合名.bson

第二种方法

运行聚合

db.getCollection('源集合名').aggregate([ { $match: {"emailAddress" : "apitester@mailinator.com"} }, { $out: "目标集合名" } ])

第三种方法(最慢的方法)

运行for循环

db.getCollection('源集合名').find().forEach(function(docs){ db.getCollection('目标集合名').insert(docs); }) print("Rollback Completed!");


5
这是对@jasongarber答案的更新,使用更近期的mongo 'bulkWrite'操作(在这里阅读文档),并且将整个过程异步化,这样您就可以将其作为依赖于完成的更广泛脚本的一部分来运行。
async function moveDocuments (sourceCollection, targetCollection, filter) {
  const sourceDocs = await sourceCollection.find(filter)

  console.log(`Moving ${await sourceDocs.count()} documents from ${sourceCollection.collectionName} to ${targetCollection.collectionName}`)

  const idsOfCopiedDocs = await insertDocuments(targetCollection, sourceDocs)

  const targetDocs = await targetCollection.find({_id: {$in: idsOfCopiedDocs}})
  await deleteDocuments(sourceCollection, targetDocs)

  console.log('Done!')
}

async function insertDocuments (collection, documents) {
  const insertedIds = []
  const bulkWrites = []

  await documents.forEach(doc => {
    const {_id} = doc

    insertedIds.push(_id)
    bulkWrites.push({
      replaceOne: {
        filter: {_id},
        replacement: doc,
        upsert: true,
      },
    })
  })

  if (bulkWrites.length) await collection.bulkWrite(bulkWrites, {ordered: false})

  return insertedIds
}

async function deleteDocuments (collection, documents) {
  const bulkWrites = []

  await documents.forEach(({_id}) => {
    bulkWrites.push({
      deleteOne: {
        filter: {_id},
      },
    })
  })

  if (bulkWrites.length) await collection.bulkWrite(bulkWrites, {ordered: false})
}

为什么需要再次从“targetCollection”中读取“targetDocs”?如果出现任何错误,批量操作不会返回错误吗? - walderich
你的脚本中只有一个错别字:第8行应该改为:const targetDocs = await sourceCollection.find({_id: {$in: idsOfCopiedDocs}})。你需要从源集合中获取文档以便删除 :) - Torsten Barthel

3
这是对@Markus W Mahlberg的重新陈述。
将恩惠作为一个函数返回。
function moveDocuments(sourceCollection,targetCollection,filter) {
    var bulkInsert = targetCollection.initializeUnorderedBulkOp();
    var bulkRemove = sourceCollection.initializeUnorderedBulkOp();
    sourceCollection.find(filter)
        .forEach(function(doc) {
        bulkInsert.insert(doc);
        bulkRemove.find({_id:doc._id}).removeOne();
        }
  )
  bulkInsert.execute();
  bulkRemove.execute();
}

一个使用示例
var x = {dsid:{$exists: true}};
moveDocuments(db.pictures,db.artifacts,x)

将所有顶级元素为dsid的文档从pictures移动到artifacts集合。

3

从性能角度考虑,使用一个命令删除大量文档(尤其是如果你在查询部分拥有索引)可能更加优秀,而不是逐个删除它们。

例如:

db.source.find({$gte: start, $lt: end}).forEach(function(doc){
   db.target.insert(doc);
});
db.source.remove({$gte: start, $lt: end});

0

我有2297个集合,包含1500万个文档,但有些集合是空的。

仅使用copyTo脚本失败了,但通过这个脚本优化:

db.getCollectionNames().forEach(function(collname) {
    var c = db.getCollection(collname).count();
    if(c!==0){
      db.getCollection(collname).copyTo('master-collection');
      print('Copied collection ' + collname);
    }
});

对我来说一切都正常。

NB:copyTo已被弃用,因为它会阻塞读/写操作:所以我认为如果您知道在此操作期间数据库不可用,那么这是可以的。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接