如何监听 MongoDB 集合的更改?

226

我正在使用MongoDB作为数据存储,创建一种后台工作队列系统。如何在生成工作程序以处理任务之前“监听”MongoDB集合的插入?

我需要每隔几秒钟轮询以查看上次是否有任何更改,还是有办法让我的脚本等待插入发生?

这是我正在进行的PHP项目,但请用Ruby或语言不可知的方式回答。


4
变更流(Change Streams)被添加到MongoDB 3.6中以解决您的情况。https://docs.mongodb.com/manual/changeStreams/此外,如果您正在使用MongoDB Atlas,您可以利用Stitch触发器,它允许您在插入/更新/删除等操作时执行函数。 https://docs.mongodb.com/stitch/triggers/overview/ 不再需要解析oplog。 - Robert Walters
12个回答

118
你所考虑的问题听起来很像触发器。MongoDB没有对触发器提供任何支持,但是一些人使用一些技巧“自己实现了”。关键在于操作日志(oplog)。
当你在副本集中运行MongoDB时,所有的MongoDB操作都会被记录到一个操作日志中(称为oplog)。Oplog基本上只是一个修改数据的运行列表。副本集通过监听这个oplog上的变化,然后在本地应用这些变化来工作。
这听起来很熟悉吗?
我无法在此详细介绍整个过程,因为需要几页文档,但你需要的工具是可以获得的。
首先,有一些关于oplog的写作: - 简要描述 - local集合的布局(其中包含oplog)
你还需要利用可遍历游标。它们将为您提供一种监听更改而不是轮询更改的方式。请注意,复制使用可遍历游标,因此这是一项支持的功能。

1
嗯...这并不完全是我所想要的。目前我只运行了一个实例(没有从属节点)。那么也许需要一个更基本的解决方案? - Andrew
20
您可以使用--replSet选项启动服务器,它会创建/填充oplog,即使没有第二个服务器也可以。这绝对是“监听”数据库更改的唯一方法。 - Gates VP
2
这是一个不错的描述,介绍如何设置oplog以在本地记录对数据库所做的更改:http://loosexaml.wordpress.com/2012/09/03/how-to-get-a-mongodb-oplog-without-a-full-replica-set/ - johndodo
太酷了!这正是我想要的。我在npm上找到了一个名为“mongo-oplog”的库。太开心了~ - pjincz
我同意在撰写本答案时可能无法使用触发器,但对于所有来到这里的人,现在有一个可用的选项,请查看MongoDB Stitch(https://docs.mongodb.com/stitch/#stitch)和Stitch触发器(https://docs.mongodb.com/stitch/triggers/)。 - whoami - fakeFaceTrueSoul
此答案已过时。Mongodb Realm 支持触发器。 - ChatGPT

106
MongoDB有所谓的capped collectionstailable cursors,它们允许MongoDB向监听者推送数据。
一个capped collection本质上是一个固定大小且只允许插入的集合。以下是创建它的示例代码:
db.createCollection("messages", { capped: true, size: 100000000 })

MongoDB可追踪游标(Jonathan H. Wage的原始帖子

Ruby

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP(Hypertext Preprocessor)
$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Python(由{{link1:Robert Stewart}})
from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl(由Max创建)
use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

附加资源:

Ruby/Node.js教程,带您创建一个应用程序,监听MongoDB capped集合中的插入。

一篇更详细介绍可追溯游标的文章。

使用可追溯游标的PHP、Ruby、Python和Perl示例。


78
睡眠1秒?真的吗?在生产代码中使用睡眠是怎么不等待的呢?这不就是轮询吗? - rbp
2
@rbp 哈哈,我从未说过这是生产代码,但你是对的,睡眠一秒钟不是一个好的做法。我很确定我从别处得到了这个例子。不确定如何重构它。 - Andrew
14
因为那些无关的细节可能会被不了解其坏处的新程序员加入到生产代码中。 - Catfish
4
我明白你的观点,但期望一些新的程序员在生产环境中添加“sleep 1”几乎是令人反感的!我的意思是,我不会感到惊讶......但如果有人把这个放到生产环境中,至少他们会从中吃一堑长一智..哈哈哈 - kroe
25
在生产环境中使用 time.sleep(1) 有什么问题? - Al Johri
显示剩余5条评论

55

看这里:变更流

2018年1月10日 - 发布3.6版

*编辑:我撰写了一篇关于如何做到这一点的文章https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/


mongodb 3.6中是新功能, https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10

$ mongod --version
db version v3.6.2

为了使用 changeStreams,数据库必须是一个副本集

有关副本集的更多信息: https://docs.mongodb.com/manual/replication/

默认情况下,您的数据库将是一个"独立"的。

如何将独立数据库转换为副本集:https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/


以下示例是如何实际应用的。
* 特别适用于 Node。

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */

有用的链接:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example

https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams


抱歉有这么多编辑,SO不喜欢我的“链接”(说它们格式不正确是代码)。 - Rio Weber
1
你不需要查询数据库,我认为使用watch()或类似的方法,新数据可以被发送到正在监听的服务器。 - Alexander Mills

48

自 MongoDB 3.6 开始,将会有一个名为 Change Streams 的新通知 API,您可以用它来实现此功能。参见 这篇博客文章的示例。下面是来自该博客文章的一个示例:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])

4
为什么?你能详细说明吗?这是现在的标准方式吗? - Mitar
1
如何实现?不要使用轮询 - 你需要采用事件驱动的方法,而不是while循环等。 - Alexander Mills
3
你在这里看到了投票调查吗? - Mitar
1
我认为他/她指的是最后一个循环。但我认为PyMongo只支持那个。Motor可能有一个异步/事件监听器风格的实现。 - Shane Hsu

25

MongoDB版本3.6现在包括变更流,这本质上是OpLog之上的API,可以用于触发器/通知等用例。

这里有一个Java示例链接:http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/

NodeJS示例可能会类似于:

 var MongoClient = require('mongodb').MongoClient;
    MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
     .then(function(client){
       let db = client.db('MyStore')

       let change_streams = db.collection('products').watch()
          change_streams.on('change', function(change){
            console.log(JSON.stringify(change));
          });
      });

JSON.stringify在接收Android Studio(Android应用程序)中的数据非常重要。 - DragonFire

4

或者,您可以使用标准的Mongo FindAndUpdate方法,在回调函数中运行EventEmitter事件(在Node中),当回调函数运行时触发该事件。

应用程序或架构的任何其他部分监听此事件将收到更新通知,并发送到那里的任何相关数据也会被通知。这是实现来自Mongo的通知的一种非常简单的方法。


这样做非常低效,每次查找和更新都会锁定数据库! - Yash Gupta
1
我的猜测是Alex回答了一个略微不同的问题(并非特别针对插入),但与之相关,即如何在排队作业的状态更改时触发某种通知以通知客户端,我们假设这将需要在作业生成、成功完成或失败时发生。使用WebSockets连接到节点的客户端可以通过FIndAndUpdate回调上的广播事件被通知更改,当接收到状态更改消息时可以调用该回调。我认为这并不低效,因为需要进行更新。 - Peter Scott

3
许多答案只会给出新记录而不是更新,或者效率极低。唯一可靠、高效的方法是在本地数据库上创建一个可追溯游标:oplog.rs集合,以获取MongoDB的所有更改,并对其进行处理(MongoDB甚至在内部使用此方法来支持复制!)。oplog包含什么内容的解释,请参见https://www.compose.com/articles/the-mongodb-oplog-and-node-js/。提供了一个Node.js库的示例,该库提供了一个API,可以处理oplog中可用的内容,请参见https://github.com/cayasso/mongo-oplog

3

有一组非常棒的服务可供使用,称为MongoDB Stitch。请查看Stitch函数/触发器。请注意,这是一个基于云的付费服务(AWS)。在您的情况下,您可以在插入时调用用JavaScript编写的自定义函数。

进入图像描述


https://stackoverflow.com/users/486867/manish-jain - 你有使用Stitch通知REACT应用程序数据已插入到表格中的示例吗? - MLissCetrus

1
实际上,与其观察输出,为什么不使用mongoose schema提供的中间件,在插入新内容时得到通知呢?
您可以捕获插入新文档的事件,并在插入完成后执行某些操作。

我的错。对不起先生。 - Duong Nguyen

0

这里有一个可用的Java示例,可以在这里找到。

 MongoClient mongoClient = new MongoClient();
    DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");

    DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
            .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

    System.out.println("== open cursor ==");

    Runnable task = () -> {
        System.out.println("\tWaiting for events");
        while (cur.hasNext()) {
            DBObject obj = cur.next();
            System.out.println( obj );

        }
    };
    new Thread(task).start();

这里提供的关键是查询选项

如果您不需要每次加载所有数据,还可以更改查找查询。

BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation

DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接