我在数据库中有大量文件,想知道如何遍历所有文档并更新它们,每个文档都有不同的值。
cursor.forEach()
。collection.find(query).forEach(function(doc) {
// handle
}, function(err) {
// done or error
});
db.collection.find(query).forEach(function(err, doc) {
// handle
});
collection.find(query, { stream: true })
.each(function(doc){
// handle doc
})
.error(function(err){
// handle error
})
.success(function(){
// final callback
});
collection.find(query).stream()
.on('data', function(doc){
// handle doc
})
.on('error', function(err){
// handle error
})
.on('end', function(){
// final callback
});
.forEach
回调中更新文档在.forEach
回调中更新文档的唯一问题是您无法确定何时更新所有文档。
为解决此问题,您应该使用一些异步控制流解决方案。以下是一些选项:
以下是使用async
的示例,使用其queue
特性:
var q = async.queue(function (doc, callback) {
// code for your update
collection.update({
_id: doc._id
}, {
$set: {hi: 'there'}
}, {
w: 1
}, callback);
}, Infinity);
var cursor = collection.find(query);
cursor.each(function(err, doc) {
if (err) throw err;
if (doc) q.push(doc); // dispatching doc to async.queue
});
q.drain = function() {
if (cursor.isClosed()) {
console.log('all items have been processed');
db.close();
}
}
.forEach
? - chovy.stream
方法已被弃用,现在我们应该使用 .cursor
方法。 - Alex K使用 mongodb
驱动程序和现代 NodeJS 的异步/等待,一个好的解决方案是使用 next()
方法:
const collection = db.collection('things')
const cursor = collection.find({
bla: 42 // find all things where bla is 42
});
let document;
while ((document = await cursor.next())) {
await collection.findOneAndUpdate({
_id: document._id
}, {
$set: {
blu: 43
}
});
}
这将导致一次只需要在内存中处理一个文档,而不像被接受的答案那样,在处理文档之前,许多文档都会进入内存。在“大型集合”(如问题所述)的情况下,这可能很重要。
如果文档很大,可以通过使用投影来进一步改善,以便仅从数据库获取所需文档的字段。
var MongoClient = require('mongodb').MongoClient,
assert = require('assert');
MongoClient.connect('mongodb://localhost:27017/crunchbase', function(err, db) {
assert.equal(err, null);
console.log("成功连接到 MongoDB。");
var query = {
"category_code": "生物技术"
};
db.collection('companies').find(query).toArray(function(err, docs) {
assert.equal(err, null);
assert.notEqual(docs.length, 0);
docs.forEach(function(doc) {
console.log(doc.name + " 是一家 " + doc.category_code + " 公司。");
});
db.close();
});
});
注意,调用.toArray
会导致应用程序提取整个数据集。
var MongoClient = require('mongodb').MongoClient,
assert = require('assert');
MongoClient.connect('mongodb://localhost:27017/crunchbase', function(err, db) {
assert.equal(err, null);
console.log("成功连接到 MongoDB 数据库。");
var query = {
"category_code": "biotech"
};
var cursor = db.collection('companies').find(query);
function(doc) {
cursor.forEach(
console.log(doc.name + " 是一家 " + doc.category_code + " 公司。");
},
function(err) {
assert.equal(err, null);
return db.close();
}
);
});
这段代码是使用Node.js和MongoDB进行数据操作的示例。在连接MongoDB服务后,查询了一个名为“biotech”类别的公司,并通过forEach()方法遍历了所有匹配的公司文档,输出每个公司名称以及其所属的类别代码。注意,find()
返回的游标被分配给 var cursor
。采用这种方法,我们将数据流到应用程序,而不是一次性在内存中获取所有数据并消耗数据。由于find()
实际上直到我们尝试使用它提供的一些文档之前不会向数据库发出请求,因此可以立即创建一个游标。 cursor
的目的是描述我们的查询。 cursor.forEach
的第二个参数表示当驱动程序耗尽数据或发生错误时要执行的操作。toArray()
强制进行了数据库调用。这意味着我们需要所有文档,并希望它们以数组
形式返回。MongoDB
以批处理格式返回数据。下面的图像显示,来自游标(从应用程序)到MongoDB
的请求。forEach
比toArray
更好,因为我们可以在文档到达之前进行处理,直到我们到达末尾。与toArray
相反-我们要等待所有文档都被检索并建立整个数组。这意味着我们没有从驱动程序和数据库系统共同批处理结果到应用程序中获得任何优势。批处理旨在提供内存开销和执行时间的效率。如果您可以在应用程序中利用它,请充分利用。之前的回答都没有提到批量更新,这使得它们非常缓慢 - 比使用bulkWrite的解决方案要慢几十倍或几百倍。
假设你想将每个文档中的一个字段的值加倍,以下是如何快速完成且内存消耗固定的方法:
// Double the value of the 'foo' field in all documents
let bulkWrites = [];
const bulkDocumentsSize = 100; // how many documents to write at once
let i = 0;
db.collection.find({ ... }).forEach(doc => {
i++;
// Update the document...
doc.foo = doc.foo * 2;
// Add the update to an array of bulk operations to execute later
bulkWrites.push({
replaceOne: {
filter: { _id: doc._id },
replacement: doc,
},
});
// Update the documents and log progress every `bulkDocumentsSize` documents
if (i % bulkDocumentsSize === 0) {
db.collection.bulkWrite(bulkWrites);
bulkWrites = [];
print(`Updated ${i} documents`);
}
});
// Flush the last <100 bulk writes
db.collection.bulkWrite(bulkWrites);
这里是使用Mongoose游标异步与承诺的示例:
new Promise(function (resolve, reject) {
collection.find(query).cursor()
.on('data', function(doc) {
// ...
})
.on('error', reject)
.on('end', resolve);
})
.then(function () {
// ...
});
参考资料:
现在你可以在异步函数中使用:
for await (let doc of collection.find(query)) {
await updateDoc(doc);
}
// all done
它很好地序列化了所有更新。
doc
可以是 const
因为它的作用域在循环内部。 - spikyjtnode-mongodb-native
支持endCallback
参数用于cursor.forEach
函数,以便处理整个迭代后的事件。请参考官方文档查看详情:http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html#forEach。forEach(iteratorCallback, endCallback)
中,当没有更多数据时(error
为未定义),会调用 endCallback(error)
。@chovy - Tien DoLeonid的回答很好,但我希望加强使用Async/promises的重要性,并提供一个使用promises的不同解决方案示例。
这个问题的最简单解决方案是在forEach循环中调用更新。通常,您不需要在每个请求后关闭db连接,但如果确实需要关闭连接,则要小心。只有确定所有更新已经执行完毕,才能关闭它。
这里的一个常见错误是在分派所有更新后调用db.close()
,而不知道它们是否已完成。如果这样做,将会出现错误。
collection.find(query).each(function(err, doc) {
if (err) throw err;
if (doc) {
collection.update(query, update, function(err, updated) {
// handle
});
}
else {
db.close(); // if there is any pending update, it will throw an error there
}
});
然而,db.close()
也是一个异步操作(参考 其回调选项),你可能很幸运,这段代码可以无误地完成。但只有在需要更新小集合中的少量文档时才能正常工作(因此请勿尝试)。
由于已经有人提出了使用异步的解决方案 (Leonid),下面是使用 Q promises 的解决方案。
var Q = require('q');
var client = require('mongodb').MongoClient;
var url = 'mongodb://localhost:27017/test';
client.connect(url, function(err, db) {
if (err) throw err;
var promises = [];
var query = {}; // select all docs
var collection = db.collection('demo');
var cursor = collection.find(query);
// read all docs
cursor.each(function(err, doc) {
if (err) throw err;
if (doc) {
// create a promise to update the doc
var query = doc;
var update = { $set: {hi: 'there'} };
var promise =
Q.npost(collection, 'update', [query, update])
.then(function(updated){
console.log('Updated: ' + updated);
});
promises.push(promise);
} else {
// close the connection after executing all promises
Q.all(promises)
.then(function() {
if (cursor.isClosed()) {
console.log('all items have been processed');
db.close();
}
})
.fail(console.error);
}
});
});
var promises = calls.map(myUpdateFunction(doc));
,我得到了错误doc未定义
。 - Philip O'BrienDatabase name: users
Collection name: jobs
===========================
Documents
{ "_id" : ObjectId("1"), "job" : "Security", "name" : "Jack", "age" : 35 }
{ "_id" : ObjectId("2"), "job" : "Development", "name" : "Tito" }
{ "_id" : ObjectId("3"), "job" : "Design", "name" : "Ben", "age" : 45}
{ "_id" : ObjectId("4"), "job" : "Programming", "name" : "John", "age" : 25 }
{ "_id" : ObjectId("5"), "job" : "IT", "name" : "ricko", "age" : 45 }
==========================
这段代码:
var MongoClient = require('mongodb').MongoClient;
var dbURL = 'mongodb://localhost/users';
MongoClient.connect(dbURL, (err, db) => {
if (err) {
throw err;
} else {
console.log('Connection successful');
var dataBase = db.db();
// loop forEach
dataBase.collection('jobs').find().forEach(function(myDoc){
console.log('There is a job called :'+ myDoc.job +'in Database')})
});
我寻找了一个性能良好的解决方案,最终我创造了一个混合体,结合了我发现的东西,我认为它运行良好:
/**
* This method will read the documents from the cursor in batches and invoke the callback
* for each batch in parallel.
* IT IS VERY RECOMMENDED TO CREATE THE CURSOR TO AN OPTION OF BATCH SIZE THAT WILL MATCH
* THE VALUE OF batchSize. This way the performance benefits are maxed out since
* the mongo instance will send into our process memory the same number of documents
* that we handle in concurrent each time, so no memory space is wasted
* and also the memory usage is limited.
*
* Example of usage:
* const cursor = await collection.aggregate([
{...}, ...],
{
cursor: {batchSize: BATCH_SIZE} // Limiting memory use
});
DbUtil.concurrentCursorBatchProcessing(cursor, BATCH_SIZE, async (doc) => ...)
* @param cursor - A cursor to batch process on.
* We can get this from our collection.js API by either using aggregateCursor/findCursor
* @param batchSize - The batch size, should match the batchSize of the cursor option.
* @param callback - Callback that should be async, will be called in parallel for each batch.
* @return {Promise<void>}
*/
static async concurrentCursorBatchProcessing(cursor, batchSize, callback) {
let doc;
const docsBatch = [];
while ((doc = await cursor.next())) {
docsBatch.push(doc);
if (docsBatch.length >= batchSize) {
await PromiseUtils.concurrentPromiseAll(docsBatch, async (currDoc) => {
return callback(currDoc);
});
// Emptying the batch array
docsBatch.splice(0, docsBatch.length);
}
}
// Checking if there is a last batch remaining since it was small than batchSize
if (docsBatch.length > 0) {
await PromiseUtils.concurrentPromiseAll(docsBatch, async (currDoc) => {
return callback(currDoc);
});
}
}
读取并更新多个大型文档的使用示例:
const cursor = await collection.aggregate([
{
...
}
], {
cursor: {batchSize: BATCH_SIZE}, // Limiting memory use
allowDiskUse: true
});
const bulkUpdates = [];
await DbUtil.concurrentCursorBatchProcessing(cursor, BATCH_SIZE, async (doc: any) => {
const update: any = {
updateOne: {
filter: {
...
},
update: {
...
}
}
};
bulkUpdates.push(update);
// Updating if we read too many docs to clear space in memory
await this.bulkWriteIfNeeded(bulkUpdates, collection);
});
// Making sure we updated everything
await this.bulkWriteIfNeeded(bulkUpdates, collection, true);
...
private async bulkWriteParametersIfNeeded(
bulkUpdates: any[], collection: any,
forceUpdate = false, flushBatchSize) {
if (bulkUpdates.length >= flushBatchSize || forceUpdate) {
// concurrentPromiseChunked is a method that loops over an array in a concurrent way using lodash.chunk and Promise.map
await PromiseUtils.concurrentPromiseChunked(bulkUpsertParameters, (upsertChunk: any) => {
return techniquesParametersCollection.bulkWrite(upsertChunk);
});
// Emptying the array
bulkUpsertParameters.splice(0, bulkUpsertParameters.length);
}
}