在MongoDB游标中串行迭代(等待回调才能移动到下一个文档)

74

使用 mongoskin,我可以执行以下查询,它将返回一个游标:

myCollection.find({}, function(err, resultCursor) {
      resultCursor.each(function(err, result) {

      }
}

然而,我希望能够针对每个文档调用一些异步函数,并且只有在这些函数回调后才继续处理游标中的下一个项(类似于async.js模块中的eachSeries结构)。例如:

myCollection.find({}, function(err, resultCursor) {
      resultCursor.each(function(err, result) {

            externalAsyncFunction(result, function(err) {
               //externalAsyncFunction completed - now want to move to next doc
            });

      }
}  

我该如何做到这一点?

谢谢

更新:

我不想使用toArray(),因为这是批量操作,结果可能无法一次性全部放入内存中。


如果您在阻塞和等待异步函数完成之前才继续进行,那么调用它异步的意义何在? - Rotem Hermon
@RotemHermon 我别无选择!这不是我的函数,而且它是异步的。(将myAsyncFunction重命名为externalAsyncFunction...) - UpTheCreek
为什么你不使用 toArray() 然后使用递归函数来遍历结果呢? - Salman
1
@Салман - 很好的问题 - 我没有使用toArray,因为它是一个大批量操作,完整的结果可能无法放入内存中。(我会更新问题) - UpTheCreek
10个回答

94

一种更现代的方法是使用 async/await

const cursor = db.collection("foo").find({});
while(await cursor.hasNext()) {
  const doc = await cursor.next();
  // process doc here
}

注意:

  • 异步迭代器到来时,这可能会更加简单。
  • 您可能需要添加 try/catch 进行错误检查。
  • 包含该代码的函数应该是 async 或者该代码应该被包裹在 (async function() { ... })() 中,因为它使用了 await
  • 如果需要,可以在 while 循环结束时添加 await new Promise(resolve => setTimeout(resolve, 1000));(暂停1秒)以显示它逐个处理文档。

1
完美地工作了,谢谢。不知道处理大数据集是否有任何陷阱? - FireBrand
5
好的,这是最佳解决方案,与那个只会崩溃的选择不同。 - Martijn Scheffer
你怎样在 Node 中使用它?我得到了这个错误:"SyntaxError: Unexpected identifier" 在 "cursor.hasNext()"。 - Nico
3
@Nico,抱歉回复晚了,但请看笔记中的第3点 ;) - user993683

49

如果您不想使用toArray将所有结果加载到内存中,可以使用游标进行迭代,例如以下代码。

myCollection.find({}, function(err, resultCursor) {
  function processItem(err, item) {
    if(item === null) {
      return; // All done!
    }

    externalAsyncFunction(item, function(err) {
      resultCursor.nextObject(processItem);
    });

  }

  resultCursor.nextObject(processItem);
}  

12
这种方法在我处理大规模数据集时无效。我收到了“RangeError: Maximum call stack size exceeded”的错误提示。 - Soichi Hayashi
2
@SoichiHayashi 将异步函数或回调包装在 process.nextTick 中! - zamnuts
4
跟进 @zamnuts 的问题,你上面的示例中为什么会出现堆栈溢出的情况是因为每当你处理一个项目时,你都会在当前处理函数内部运行另一个回调来处理下一个项目。随着结果集的增长,你将循环遍历更多的函数调用,并且每个函数调用都会在之前的调用堆栈上创建一个新的堆栈帧。将异步回调包装在 process.nextTicksetImmediatesetTimeout 中可以使它在下一次循环中运行,在我们创建的处理每个文档的调用堆栈的“外部”运行。 - pospi
3
cursor.forEach() 是什么? - Redsandro
3
@Redsandro - cursor.forEach()方法没有提供任何异步的方式来指示移动到下一个项目。 - UpTheCreek
显示剩余4条评论

37

自Node.js版本10.3起,您可以使用异步迭代器

const cursor = db.collection('foo').find({});
for await (const doc of cursor) {
  // do your thing
  // you can even use `await myAsyncOperation()` here
}

Jake Archibald写了一篇很棒的博客文章关于异步迭代器,我是在阅读@user993683的回答后才知道的。


寻找这个解决方案,谢谢! - F.H.

11

使用 setImmediate 可以处理大型数据集:

var cursor = collection.find({filter...}).cursor();

cursor.nextObject(function fn(err, item) {
    if (err || !item) return;

    setImmediate(fnAction, item, arg1, arg2, function() {
        cursor.nextObject(fn);
    });
});

function fnAction(item, arg1, arg2, callback) {
    // Here you can do whatever you want to do with your item.
    return callback();
}

这很棒,但你不需要在第一行使用“.cursor()”(我收到了一个错误)。 - Nico
这取决于使用的mongoose版本。这是针对较旧版本的。 - Daphoque

4

如果有人想要使用Promise来完成这个操作(而不是使用nextObject的回调函数),以下是方法。我使用的是Node v4.2.2和mongo驱动v2.1.7。这是一种类似于Cursor.forEach()的asyncSeries版本:

function forEachSeries(cursor, iterator) {
  return new Promise(function(resolve, reject) {
    var count = 0;
    function processDoc(doc) {
      if (doc != null) {
        count++;
        return iterator(doc).then(function() {
          return cursor.next().then(processDoc);
        });
      } else {
        resolve(count);
      }
    }
    cursor.next().then(processDoc);
  });
}

要使用此功能,需要传递游标和一个异步操作每个文档的迭代器(就像您为Cursor.forEach所做的那样)。 迭代器需要返回一个承诺,就像大多数mongodb本地驱动程序函数一样。

比如说,您想要更新集合test中的所有文档。 您可以按照以下方式执行:

var theDb;
MongoClient.connect(dbUrl).then(function(db) {
  theDb = db;     // save it, we'll need to close the connection when done.
  var cur = db.collection('test').find();

  return forEachSeries(cur, function(doc) {    // this is the iterator
    return db.collection('test').updateOne(
      {_id: doc._id},
      {$set: {updated: true}}       // or whatever else you need to change
    );
    // updateOne returns a promise, if not supplied a callback. Just return it.
  });
})
.then(function(count) {
  console.log("All Done. Processed", count, "records");
  theDb.close();
})

我没有看到forEachSeries被调用在哪里。 - chovy
1
调用栈溢出。 - chovy

2
可以使用 async 库来实现这样的操作。关键点在于检查当前文档是否为 null。如果是,表示已完成。
async.series([
        function (cb) {
            cursor.each(function (err, doc) {
                if (err) {
                    cb(err);
                } else if (doc === null) {
                    cb();
                } else {
                    console.log(doc);
                    array.push(doc);
                }
            });
        }
    ], function (err) {
        callback(err, array);
    });

嗨,安东尼 - 我使用这种方法遇到的问题是,如果你需要异步地为每个记录执行某些操作,那么游标循环无法等待直到完成。(游标.each不提供“下一个”回调,因此只能在其中进行同步操作)。 - UpTheCreek

0
你可以使用Future:
myCollection.find({}, function(err, resultCursor) {
    resultCursor.count(Meteor.bindEnvironment(function(err,count){
        for(var i=0;i<count;i++)
        {
            var itemFuture=new Future();

            resultCursor.nextObject(function(err,item)){
                itemFuture.result(item);
            }

            var item=itemFuture.wait();
            //do what you want with the item, 
            //and continue with the loop if so

        }
    }));
});

0

您可以通过使用递归函数,以类似以下方式的 Array 进行迭代并获取结果。

myCollection.find({}).toArray(function (err, items) {
    var count = items.length;
    var fn = function () {
        externalAsyncFuntion(items[count], function () {
            count -= 1;
            if (count) fn();
        })
    }

    fn();
});

编辑:

这仅适用于小型数据集,对于较大的数据集,您应该使用其他答案中提到的游标。


抱歉,在评论中回答您的问题太慢了 - 由于结果集太大,我无法使用toArray。 - UpTheCreek
哦,好的。那么另一个答案适合你。 - Salman
在代码中,当条件为true时,最好避免使用这种模式,因为随着数据的增长,它可能会出现问题。 - Mark Stosberg

0
一种更现代的方法是使用for await
const cursor = db.collection("foo").find({});

for await(const doc of cursor) {
  // process doc here with await
  await processDoc(doc);
}

-2

您可以使用简单的setTimeOut。这是在Node.js上运行的TypeScript示例(我正在使用“when”模块通过承诺,但也可以不使用它们):

        import mongodb = require("mongodb");

        var dbServer = new mongodb.Server('localhost', 27017, {auto_reconnect: true}, {});
        var db =  new mongodb.Db('myDb', dbServer);

        var util = require('util');
        var when = require('when'); //npm install when

        var dbDefer = when.defer();
        db.open(function() {
            console.log('db opened...');
            dbDefer.resolve(db);
        });

        dbDefer.promise.then(function(db : mongodb.Db){
            db.collection('myCollection', function (error, dataCol){
                if(error) {
                    console.error(error); return;
                }

                var doneReading = when.defer();

                var processOneRecordAsync = function(record) : When.Promise{
                    var result = when.defer();

                    setTimeout (function() {
                        //simulate a variable-length operation
                        console.log(util.inspect(record));
                        result.resolve('record processed');
                    }, Math.random()*5);

                    return result.promise;
                }

                var runCursor = function (cursor : MongoCursor){
                    cursor.next(function(error : any, record : any){
                        if (error){
                            console.log('an error occurred: ' + error);
                            return;
                        }
                        if (record){
                            processOneRecordAsync(record).then(function(r){
                                setTimeout(function() {runCursor(cursor)}, 1);
                            });
                        }
                        else{
                            //cursor up
                            doneReading.resolve('done reading data.');
                        }
                    });
                }

                dataCol.find({}, function(error, cursor : MongoCursor){
                    if (!error)
                    {
                        setTimeout(function() {runCursor(cursor)}, 1);
                    }
                });

                doneReading.promise.then(function(message : string){
                    //message='done reading data'
                    console.log(message);
                });
            });
        });

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接