MongoDb实时(或准实时)流式传输插入的数据

6

我有许多MongoDB集合,这些集合从各种流数据源中获取许多JSON文档。换句话说,有许多进程不断将数据插入一组MongoDB集合。

我需要一种将数据从MongoDB流式传输到下游应用程序的方法。因此,我希望实现一个概念上类似于以下结构的系统:

App Stream1 --> 
App Stream2 -->     MONGODB     --->  Aggregated Stream
App Stream3 -->

或者这样:

App Stream1 -->                 --->  MongoD Stream1
App Stream2 -->     MONGODB     --->  MongoD Stream2
App Stream3 -->                 --->  MongoD Stream3

问题是如何从Mongo中流式传输数据,而不必持续轮询/查询数据库?
显而易见的解决方法是“为什么不将这些应用程序流处理过程更改为向队列(如Rabbit、Zero或ActiveMQ)发送消息,然后将它们一次性发送到您的Mongo流处理过程和Mongo中,就像这样:”
                 MONGODB
                   /|\  
                    |
App Stream1 -->     |          --->  MongoD Stream1
App Stream2 -->  SomeMQqueue   --->  MongoD Stream2
App Stream3 -->                --->  MongoD Stream3

在理想的情况下,是的,这样做很好,但我们需要Mongo先确保消息被保存,以避免重复,并确保生成所有ID等。Mongo必须作为持久层位于中间。
那么我如何从Mongo集合(不使用GridFS等)中流式传输消息到这些下游应用程序?基本思路是仅轮询新文档,每个收集到的文档都通过向存储在数据库中的JSON文档添加另一个字段进行更新,就像SQL表中存储的处理时间戳的过程标志一样。也就是说,每隔1秒钟轮询未处理的文档...添加processed = now()...更新文档。
是否有更简洁/计算效率更高的方法?
FYI-这些都是Java进程。
干杯!
1个回答

3
如果你要写入一个已经有容量限制的集合(或多个集合),可以使用可遍历游标将新数据推送到流中,或者推送到消息队列中以便于从中进行流式传输。但是这对于非容量限制的集合不起作用。

谢谢提供链接。遗憾的是没有使用封顶集合,但对于消息服务来说这也不是一个坏特性。听起来在处理标志和轮询上建立索引是唯一的选择...如果索引项为空,它是否仍然被索引引用,或者查询空值是否仍然意味着集合扫描? - NightWolf
1
或者我们可以创建一个有固定大小的限制集合,像缓存一样运作,然后逐个取出项目并将它们放回到普通集合中。那么问题就变成了如何在应用程序运行之间保存我们的位置光标?我假设我们只需使用Mongo自动生成的_id字段,并选择大于该ID字段的所有内容... 所有Mongo生成的_id是否都是按顺序递增的? - NightWolf
1
索引确实会存储null条目。如果您正在跟踪一个固定大小的集合,您需要存储您看到的最后一个条目(您可以使用另一个Mongo集合来存储它),然后使用$minskip(1)从该元素开始恢复您的可追溯游标。请参阅http://www.mongodb.org/display/DOCS/Advanced+Queries#AdvancedQueries-%24minand%24max。 - dcrosta
1
此外,请注意确保您的已限制集合足够大,以容纳在应用程序尾随该集合期间可能接收到的所有文档。如果限制集合太小,则可能会从队列的角度丢失文档。 - dcrosta
1
有办法解释 oplog (它也是一个封顶集合),但这必须在主服务器上完成,但我还没有深入研究过。也许值得一看。顺便说一下,请保持我们的最新消息,因为这是一个有趣的问题。 - lobster1234
还可以查看我的博客:http://www.warski.org/blog/2012/11/event-streaming-with-mongodb/ - adamw

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接