我有许多MongoDB集合,这些集合从各种流数据源中获取许多JSON文档。换句话说,有许多进程不断将数据插入一组MongoDB集合。
我需要一种将数据从MongoDB流式传输到下游应用程序的方法。因此,我希望实现一个概念上类似于以下结构的系统:
App Stream1 -->
App Stream2 --> MONGODB ---> Aggregated Stream
App Stream3 -->
或者这样:
App Stream1 --> ---> MongoD Stream1
App Stream2 --> MONGODB ---> MongoD Stream2
App Stream3 --> ---> MongoD Stream3
问题是如何从Mongo中流式传输数据,而不必持续轮询/查询数据库?
显而易见的解决方法是“为什么不将这些应用程序流处理过程更改为向队列(如Rabbit、Zero或ActiveMQ)发送消息,然后将它们一次性发送到您的Mongo流处理过程和Mongo中,就像这样:”
MONGODB
/|\
|
App Stream1 --> | ---> MongoD Stream1
App Stream2 --> SomeMQqueue ---> MongoD Stream2
App Stream3 --> ---> MongoD Stream3
在理想的情况下,是的,这样做很好,但我们需要Mongo先确保消息被保存,以避免重复,并确保生成所有ID等。Mongo必须作为持久层位于中间。
那么我如何从Mongo集合(不使用GridFS等)中流式传输消息到这些下游应用程序?基本思路是仅轮询新文档,每个收集到的文档都通过向存储在数据库中的JSON文档添加另一个字段进行更新,就像SQL表中存储的处理时间戳的过程标志一样。也就是说,每隔1秒钟轮询未处理的文档...添加processed = now()...更新文档。
是否有更简洁/计算效率更高的方法?
FYI-这些都是Java进程。
干杯!
null
条目。如果您正在跟踪一个固定大小的集合,您需要存储您看到的最后一个条目(您可以使用另一个Mongo集合来存储它),然后使用$min
和skip(1)
从该元素开始恢复您的可追溯游标。请参阅http://www.mongodb.org/display/DOCS/Advanced+Queries#AdvancedQueries-%24minand%24max。 - dcrosta