如何在使用Node.js的MongoDB中使用cursor.forEach()?

64

我在数据库中有大量文件,想知道如何遍历所有文档并更新它们,每个文档都有不同的值。


2
这取决于您使用的连接到MongoDB的驱动程序。 - Leonid Beschastny
我正在使用 MongoDB 驱动程序。 - Alex Brodov
你能给我一些使用forEach()内部update的例子,并指定在哪里关闭与数据库的连接,因为我在这方面遇到了问题吗? - Alex Brodov
10个回答

135
答案取决于您使用的驱动程序。我知道的所有MongoDB驱动程序都以某种方式实现了cursor.forEach()
以下是一些示例:

node-mongodb-native

collection.find(query).forEach(function(doc) {
  // handle
}, function(err) {
  // done or error
});

mongojs

db.collection.find(query).forEach(function(err, doc) {
  // handle
});

和尚

collection.find(query, { stream: true })
  .each(function(doc){
    // handle doc
  })
  .error(function(err){
    // handle error
  })
  .success(function(){
    // final callback
  });

mongoose

collection.find(query).stream()
  .on('data', function(doc){
    // handle doc
  })
  .on('error', function(err){
    // handle error
  })
  .on('end', function(){
    // final callback
  });

.forEach回调中更新文档

.forEach回调中更新文档的唯一问题是您无法确定何时更新所有文档。

为解决此问题,您应该使用一些异步控制流解决方案。以下是一些选项:

以下是使用async的示例,使用其queue特性

var q = async.queue(function (doc, callback) {
  // code for your update
  collection.update({
    _id: doc._id
  }, {
    $set: {hi: 'there'}
  }, {
    w: 1
  }, callback);
}, Infinity);

var cursor = collection.find(query);
cursor.each(function(err, doc) {
  if (err) throw err;
  if (doc) q.push(doc); // dispatching doc to async.queue
});

q.drain = function() {
  if (cursor.isClosed()) {
    console.log('all items have been processed');
    db.close();
  }
}

我可以在其中使用一个更新函数,该函数将从for each中获取doc._id作为查询吗? - Alex Brodov
1
可以的,但是在关闭连接之前,您需要等待所有更新操作完成。 - Leonid Beschastny
1
您能否添加一个 Promise 示例来迭代 .forEach - chovy
1
就目前而言,node-mongodb-native现在有一个名为forEach的API,它接受一个回调函数(http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html#forEach)。 - Richard
6
给mongoose的一个小提示 - .stream 方法已被弃用,现在我们应该使用 .cursor 方法。 - Alex K
显示剩余7条评论

39

使用 mongodb 驱动程序和现代 NodeJS 的异步/等待,一个好的解决方案是使用 next() 方法:

const collection = db.collection('things')
const cursor = collection.find({
  bla: 42 // find all things where bla is 42
});
let document;
while ((document = await cursor.next())) {
  await collection.findOneAndUpdate({
    _id: document._id
  }, {
    $set: {
      blu: 43
    }
  });
}

这将导致一次只需要在内存中处理一个文档,而不像被接受的答案那样,在处理文档之前,许多文档都会进入内存。在“大型集合”(如问题所述)的情况下,这可能很重要。

如果文档很大,可以通过使用投影来进一步改善,以便仅从数据库获取所需文档的字段。


3
@ZachSmith:这个方法是正确的,但速度非常慢。可以通过使用bulkWrite来提高速度10倍或更多。 - Dan Dascalescu
1
有一个名为 hasNext 的方法。应该使用它来代替 while(document=....) 的模式。 - transang
1
@ZivGlazer,这不是我想说的:所呈现的代码仅需要一次在内存中加载一个文档。您提到的批处理大小确定了从数据库中获取多少个文档的请求,并且这决定了任何时候将有多少个文档驻留在内存中,因此在这方面您是正确的;但是,批处理大小为1可能不是一个好主意...找到一个好的批处理大小是一个不同的问题 :) - chris6953
我同意,我想做一个类似的读取大量大文档并更新每个已读文档的过程,最终我实现了这样的功能: - Ziv Glazer
我将我的代码以更易读的形式添加到了这个问题的答案中,应该在那里更容易阅读^^ - Ziv Glazer
显示剩余4条评论

21

var MongoClient = require('mongodb').MongoClient,
    assert = require('assert');
MongoClient.connect('mongodb://localhost:27017/crunchbase', function(err, db) {
assert.equal(err, null); console.log("成功连接到 MongoDB。");
var query = { "category_code": "生物技术" };
db.collection('companies').find(query).toArray(function(err, docs) {
assert.equal(err, null); assert.notEqual(docs.length, 0);
docs.forEach(function(doc) { console.log(doc.name + " 是一家 " + doc.category_code + " 公司。"); });
db.close();
});
});

注意,调用.toArray会导致应用程序提取整个数据集。


var MongoClient = require('mongodb').MongoClient,
    assert = require('assert');
MongoClient.connect('mongodb://localhost:27017/crunchbase', function(err, db) {
assert.equal(err, null); console.log("成功连接到 MongoDB 数据库。");
var query = { "category_code": "biotech" };
var cursor = db.collection('companies').find(query);
function(doc) { cursor.forEach( console.log(doc.name + " 是一家 " + doc.category_code + " 公司。"); }, function(err) { assert.equal(err, null); return db.close(); } ); });
这段代码是使用Node.js和MongoDB进行数据操作的示例。在连接MongoDB服务后,查询了一个名为“biotech”类别的公司,并通过forEach()方法遍历了所有匹配的公司文档,输出每个公司名称以及其所属的类别代码。注意,find() 返回的游标被分配给 var cursor。采用这种方法,我们将数据流到应用程序,而不是一次性在内存中获取所有数据并消耗数据。由于find() 实际上直到我们尝试使用它提供的一些文档之前不会向数据库发出请求,因此可以立即创建一个游标。 cursor 的目的是描述我们的查询。 cursor.forEach 的第二个参数表示当驱动程序耗尽数据或发生错误时要执行的操作。
在以上代码的初始版本中,toArray() 强制进行了数据库调用。这意味着我们需要所有文档,并希望它们以数组形式返回。
此外,MongoDB 以批处理格式返回数据。下面的图像显示,来自游标(从应用程序)到MongoDB 的请求。

MongoDB游标请求

forEachtoArray更好,因为我们可以在文档到达之前进行处理,直到我们到达末尾。与toArray相反-我们要等待所有文档都被检索并建立整个数组。这意味着我们没有从驱动程序和数据库系统共同批处理结果到应用程序中获得任何优势。批处理旨在提供内存开销和执行时间的效率。如果您可以在应用程序中利用它,请充分利用。

2
光标在客户端处理数据时会导致额外的问题。由于整个系统是异步的,因此数组更容易消耗。 - DragonFire

10

之前的回答都没有提到批量更新,这使得它们非常缓慢 - 比使用bulkWrite的解决方案要慢几十倍或几百倍。

假设你想将每个文档中的一个字段的值加倍,以下是如何快速完成且内存消耗固定的方法:

// Double the value of the 'foo' field in all documents
let bulkWrites = [];
const bulkDocumentsSize = 100;  // how many documents to write at once
let i = 0;
db.collection.find({ ... }).forEach(doc => {
  i++;

  // Update the document...
  doc.foo = doc.foo * 2;

  // Add the update to an array of bulk operations to execute later
  bulkWrites.push({
    replaceOne: {
      filter: { _id: doc._id },
      replacement: doc,
    },
  });

  // Update the documents and log progress every `bulkDocumentsSize` documents
  if (i % bulkDocumentsSize === 0) {
    db.collection.bulkWrite(bulkWrites);
    bulkWrites = [];
    print(`Updated ${i} documents`);
  }
});
// Flush the last <100 bulk writes
db.collection.bulkWrite(bulkWrites);

5

这里是使用Mongoose游标异步与承诺的示例:

new Promise(function (resolve, reject) {
  collection.find(query).cursor()
    .on('data', function(doc) {
      // ...
    })
    .on('error', reject)
    .on('end', resolve);
})
.then(function () {
  // ...
});

参考资料:


如果您有一个相当大的文档列表,这会不会耗尽内存?(例如:1000万个文档) - chovy
@chovy 理论上不应该,这就是为什么你首先使用游标而不是将所有内容加载到数组中的原因。承诺只有在游标结束或出现错误后才得以实现。如果您确实拥有这样的数据库,那么测试这一点应该不太困难,我自己也很好奇。 - Wtower
你可以使用限制来避免获取10M个文档,我不知道有任何人能够一次阅读10M个文档,或者有一个屏幕可以在一页上呈现它们(无论是手机、笔记本电脑还是台式机 - 忘记手表吧)。 - DragonFire

5

现在你可以在异步函数中使用:

for await (let doc of collection.find(query)) {
  await updateDoc(doc);
}

// all done

它很好地序列化了所有更新。


doc 可以是 const 因为它的作用域在循环内部。 - spikyjt

4

你能提供一个在所有结果处理完后触发回调的例子吗? - chovy
forEach(iteratorCallback, endCallback) 中,当没有更多数据时(error 为未定义),会调用 endCallback(error)。@chovy - Tien Do

4

Leonid的回答很好,但我希望加强使用Async/promises的重要性,并提供一个使用promises的不同解决方案示例。

这个问题的最简单解决方案是在forEach循环中调用更新。通常,您不需要在每个请求后关闭db连接,但如果确实需要关闭连接,则要小心。只有确定所有更新已经执行完毕,才能关闭它。

这里的一个常见错误是在分派所有更新后调用db.close(),而不知道它们是否已完成。如果这样做,将会出现错误。

错误的实现方式:

collection.find(query).each(function(err, doc) {
  if (err) throw err;

  if (doc) {
    collection.update(query, update, function(err, updated) {
      // handle
    });
  } 
  else {
    db.close(); // if there is any pending update, it will throw an error there
  }
});

然而,db.close()也是一个异步操作(参考 其回调选项),你可能很幸运,这段代码可以无误地完成。但只有在需要更新小集合中的少量文档时才能正常工作(因此请勿尝试)。


正确解决方案:

由于已经有人提出了使用异步的解决方案 (Leonid),下面是使用 Q promises 的解决方案。

var Q = require('q');
var client = require('mongodb').MongoClient;

var url = 'mongodb://localhost:27017/test';

client.connect(url, function(err, db) {
  if (err) throw err;

  var promises = [];
  var query = {}; // select all docs
  var collection = db.collection('demo');
  var cursor = collection.find(query);

  // read all docs
  cursor.each(function(err, doc) {
    if (err) throw err;

    if (doc) {

      // create a promise to update the doc
      var query = doc;
      var update = { $set: {hi: 'there'} };

      var promise = 
        Q.npost(collection, 'update', [query, update])
        .then(function(updated){ 
          console.log('Updated: ' + updated); 
        });

      promises.push(promise);
    } else {

      // close the connection after executing all promises
      Q.all(promises)
      .then(function() {
        if (cursor.isClosed()) {
          console.log('all items have been processed');
          db.close();
        }
      })
      .fail(console.error);
    }
  });
});

当我尝试这个时,对于行var promises = calls.map(myUpdateFunction(doc));,我得到了错误doc未定义 - Philip O'Brien
1
@robocode,谢谢!我已经修复了错误,现在示例正常工作。 - Zanon
1
这种方法需要您在处理结果之前将整个数据集加载到内存中。对于大型数据集来说,这种方式并不实用(您会耗尽内存)。您可以使用 .toArray(),这样更简单(当然也需要加载整个数据集)。 - UpTheCreek
@UpTheCreek,感谢您的反馈。我已经更新了我的答案,现在只存储promise对象而不是doc对象。这样做可以节省大量内存,因为promises对象是一个小的JSON,用于存储状态。 - Zanon
@UpTheCreek,关于内存使用的更好解决方案,您需要执行两次查询:首先进行“计数”,然后再获取游标。每次更新完成后,您将增加一个变量,并且只有在此变量达到结果总数时才停止程序(并关闭db连接)。 - Zanon
显示剩余5条评论

1
假设我们已经有了以下MongoDB数据。
Database name: users
Collection name: jobs
===========================
Documents
{ "_id" : ObjectId("1"), "job" : "Security", "name" : "Jack", "age" : 35 }
{ "_id" : ObjectId("2"), "job" : "Development", "name" : "Tito" }
{ "_id" : ObjectId("3"), "job" : "Design", "name" : "Ben", "age" : 45}
{ "_id" : ObjectId("4"), "job" : "Programming", "name" : "John", "age" : 25 }
{ "_id" : ObjectId("5"), "job" : "IT", "name" : "ricko", "age" : 45 }
==========================

这段代码:

var MongoClient = require('mongodb').MongoClient;
var dbURL = 'mongodb://localhost/users';

MongoClient.connect(dbURL, (err, db) => {
    if (err) {
        throw err;
    } else {
        console.log('Connection successful');
        var dataBase = db.db();
        // loop forEach
        dataBase.collection('jobs').find().forEach(function(myDoc){
        console.log('There is a job called :'+ myDoc.job +'in Database')})
});

0

我寻找了一个性能良好的解决方案,最终我创造了一个混合体,结合了我发现的东西,我认为它运行良好:

/**
 * This method will read the documents from the cursor in batches and invoke the callback
 * for each batch in parallel.
 * IT IS VERY RECOMMENDED TO CREATE THE CURSOR TO AN OPTION OF BATCH SIZE THAT WILL MATCH
 * THE VALUE OF batchSize. This way the performance benefits are maxed out since
 * the mongo instance will send into our process memory the same number of documents
 * that we handle in concurrent each time, so no memory space is wasted
 * and also the memory usage is limited.
 *
 * Example of usage:
 * const cursor = await collection.aggregate([
     {...}, ...],
     {
        cursor: {batchSize: BATCH_SIZE} // Limiting memory use
    });
 DbUtil.concurrentCursorBatchProcessing(cursor, BATCH_SIZE, async (doc) => ...)
 * @param cursor - A cursor to batch process on.
 * We can get this from our collection.js API by either using aggregateCursor/findCursor
 * @param batchSize - The batch size, should match the batchSize of the cursor option.
 * @param callback - Callback that should be async, will be called in parallel for each batch.
 * @return {Promise<void>}
 */
static async concurrentCursorBatchProcessing(cursor, batchSize, callback) {
    let doc;
    const docsBatch = [];

    while ((doc = await cursor.next())) {
        docsBatch.push(doc);

        if (docsBatch.length >= batchSize) {
            await PromiseUtils.concurrentPromiseAll(docsBatch, async (currDoc) => {
                return callback(currDoc);
            });

            // Emptying the batch array
            docsBatch.splice(0, docsBatch.length);
        }
    }

    // Checking if there is a last batch remaining since it was small than batchSize
    if (docsBatch.length > 0) {
        await PromiseUtils.concurrentPromiseAll(docsBatch, async (currDoc) => {
            return callback(currDoc);
        });
    }
}

读取并更新多个大型文档的使用示例:

        const cursor = await collection.aggregate([
        {
            ...
        }
    ], {
        cursor: {batchSize: BATCH_SIZE}, // Limiting memory use 
        allowDiskUse: true
    });

    const bulkUpdates = [];

    await DbUtil.concurrentCursorBatchProcessing(cursor, BATCH_SIZE, async (doc: any) => {
        const update: any = {
            updateOne: {
                filter: {
                    ...
                },
                update: {
                   ...
                }
            }
        };            

        bulkUpdates.push(update);

        // Updating if we read too many docs to clear space in memory
        await this.bulkWriteIfNeeded(bulkUpdates, collection);
    });

    // Making sure we updated everything
    await this.bulkWriteIfNeeded(bulkUpdates, collection, true);

...

    private async bulkWriteParametersIfNeeded(
    bulkUpdates: any[], collection: any,
    forceUpdate = false, flushBatchSize) {

    if (bulkUpdates.length >= flushBatchSize || forceUpdate) {
        // concurrentPromiseChunked is a method that loops over an array in a concurrent way using lodash.chunk and Promise.map
        await PromiseUtils.concurrentPromiseChunked(bulkUpsertParameters, (upsertChunk: any) => {
            return techniquesParametersCollection.bulkWrite(upsertChunk);
        });

        // Emptying the array
        bulkUpsertParameters.splice(0, bulkUpsertParameters.length);
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接