使用聚合可以进行复制。但我想要做的是移动文档。 有什么方法可以用来移动文档?
@markus-w-mahlberg 展示的批量操作(@mark-mullin 进行了优化)虽然高效,但写法不安全。如果批量插入失败,批量删除仍将继续。为了确保在移动记录时不会丢失任何记录,请改用以下方式:
function insertBatch(collection, documents) {
var bulkInsert = collection.initializeUnorderedBulkOp();
var insertedIds = [];
var id;
documents.forEach(function(doc) {
id = doc._id;
// Insert without raising an error for duplicates
bulkInsert.find({_id: id}).upsert().replaceOne(doc);
insertedIds.push(id);
});
bulkInsert.execute();
return insertedIds;
}
function deleteBatch(collection, documents) {
var bulkRemove = collection.initializeUnorderedBulkOp();
documents.forEach(function(doc) {
bulkRemove.find({_id: doc._id}).removeOne();
});
bulkRemove.execute();
}
function moveDocuments(sourceCollection, targetCollection, filter, batchSize) {
print("Moving " + sourceCollection.find(filter).count() + " documents from " + sourceCollection + " to " + targetCollection);
var count;
while ((count = sourceCollection.find(filter).count()) > 0) {
print(count + " documents remaining");
sourceDocs = sourceCollection.find(filter).limit(batchSize);
idsOfCopiedDocs = insertBatch(targetCollection, sourceDocs);
targetDocs = targetCollection.find({_id: {$in: idsOfCopiedDocs}});
deleteBatch(sourceCollection, targetDocs);
}
print("Done!")
}
更新 2
请不要再为此答案点赞。如@jasongarber的答案所述,他的回答在各个方面都更好。
更新
这个由@jasongarber提供的答案是更安全的方法,应该代替我的方法使用。
如果我理解正确您想移动所有超过1个月的文档,且使用mongoDB 2.6,则没有理由不使用bulk操作,这是我知道的最有效的执行多个操作的方式:
> var bulkInsert = db.target.initializeUnorderedBulkOp()
> var bulkRemove = db.source.initializeUnorderedBulkOp()
> var date = new Date()
> date.setMonth(date.getMonth() -1)
> db.source.find({"yourDateField":{$lt: date}}).forEach(
function(doc){
bulkInsert.insert(doc);
bulkRemove.find({_id:doc._id}).removeOne();
}
)
> bulkInsert.execute()
> bulkRemove.execute()
这应该相当快,并且具有这样的优势,即在批量插入过程中出现问题时,原始数据仍然存在。
编辑
为了防止使用过多内存,您可以在处理每个 x
个文档时执行批量操作:
> var bulkInsert = db.target.initializeUnorderedBulkOp()
> var bulkRemove = db.source.initializeUnorderedBulkOp()
> var x = 10000
> var counter = 0
> var date = new Date()
> date.setMonth(date.getMonth() -1)
> db.source.find({"yourDateField":{$lt: date}}).forEach(
function(doc){
bulkInsert.insert(doc);
bulkRemove.find({_id:doc._id}).removeOne();
counter ++
if( counter % x == 0){
bulkInsert.execute()
bulkRemove.execute()
bulkInsert = db.target.initializeUnorderedBulkOp()
bulkRemove = db.source.initializeUnorderedBulkOp()
}
}
)
> bulkInsert.execute()
> bulkRemove.execute()
致命错误:CALL_AND_RETRY_2#分配失败-内存不足
。这台服务器拥有32GB的内存,而这些记录只有5个字段。整个集合的数据总大小只有约5GB。 - UpTheCreek插入和删除:
var documentsToMove = db.collectionA.find({});
documentsToMove.forEach(function(doc) {
db.collectionB.insert(doc);
db.collectionA.remove(doc);
});
注意:对于大型集合或包含大型文档的集合,此方法可能会相当缓慢。
});
,而不仅仅是}
。缺少闭合括号。 - Jabba$out 是用于创建带有数据的新集合的,因此使用 $out。
db.oldCollection.aggregate([{$out : "newCollection"}])
然后使用 drop
db.oldCollection.drop()
您可以使用范围查询从sourceCollection获取数据并将游标数据保存在变量中,然后循环遍历并插入到目标集合:
var doc = db.sourceCollection.find({
"Timestamp":{
$gte:ISODate("2014-09-01T00:00:00Z"),
$lt:ISODate("2014-10-01T00:00:00Z")
}
});
doc.forEach(function(doc){
db.targetCollection.insert(doc);
})
第一种方法(使用Mongo Dump)
1.从集合获取转储
mongodump -d 数据库名 -c 源集合名
2.恢复到集合
mongorestore -d 数据库名 -c 目标集合名 dir=dump/数据库名/源集合名.bson
第二种方法
运行聚合
db.getCollection('源集合名').aggregate([ { $match: {"emailAddress" : "apitester@mailinator.com"} }, { $out: "目标集合名" } ])
第三种方法(最慢的方法)
运行for循环
db.getCollection('源集合名').find().forEach(function(docs){ db.getCollection('目标集合名').insert(docs); }) print("Rollback Completed!");
async function moveDocuments (sourceCollection, targetCollection, filter) {
const sourceDocs = await sourceCollection.find(filter)
console.log(`Moving ${await sourceDocs.count()} documents from ${sourceCollection.collectionName} to ${targetCollection.collectionName}`)
const idsOfCopiedDocs = await insertDocuments(targetCollection, sourceDocs)
const targetDocs = await targetCollection.find({_id: {$in: idsOfCopiedDocs}})
await deleteDocuments(sourceCollection, targetDocs)
console.log('Done!')
}
async function insertDocuments (collection, documents) {
const insertedIds = []
const bulkWrites = []
await documents.forEach(doc => {
const {_id} = doc
insertedIds.push(_id)
bulkWrites.push({
replaceOne: {
filter: {_id},
replacement: doc,
upsert: true,
},
})
})
if (bulkWrites.length) await collection.bulkWrite(bulkWrites, {ordered: false})
return insertedIds
}
async function deleteDocuments (collection, documents) {
const bulkWrites = []
await documents.forEach(({_id}) => {
bulkWrites.push({
deleteOne: {
filter: {_id},
},
})
})
if (bulkWrites.length) await collection.bulkWrite(bulkWrites, {ordered: false})
}
function moveDocuments(sourceCollection,targetCollection,filter) {
var bulkInsert = targetCollection.initializeUnorderedBulkOp();
var bulkRemove = sourceCollection.initializeUnorderedBulkOp();
sourceCollection.find(filter)
.forEach(function(doc) {
bulkInsert.insert(doc);
bulkRemove.find({_id:doc._id}).removeOne();
}
)
bulkInsert.execute();
bulkRemove.execute();
}
var x = {dsid:{$exists: true}};
moveDocuments(db.pictures,db.artifacts,x)
从性能角度考虑,使用一个命令删除大量文档(尤其是如果你在查询部分拥有索引)可能更加优秀,而不是逐个删除它们。
例如:
db.source.find({$gte: start, $lt: end}).forEach(function(doc){
db.target.insert(doc);
});
db.source.remove({$gte: start, $lt: end});
我有2297个集合,包含1500万个文档,但有些集合是空的。
仅使用copyTo脚本失败了,但通过这个脚本优化:
db.getCollectionNames().forEach(function(collname) {
var c = db.getCollection(collname).count();
if(c!==0){
db.getCollection(collname).copyTo('master-collection');
print('Copied collection ' + collname);
}
});
对我来说一切都正常。
NB:copyTo已被弃用,因为它会阻塞读/写操作:所以我认为如果您知道在此操作期间数据库不可用,那么这是可以的。