我的想法是使用startAtOperationTime选项,但我们无法可靠地知道所需的时间戳。此外,我不知道如何使用事务完成它。
问题是-我们如何在读取操作后精确地打开更改流而不丢失任何数据。
很棒的问题。
在不知道这些增量应用于哪个初始状态的情况下,带有发出的 delta 更改的手表是相当无用的,因此当然需要进行查找。 据我所知,Mongo 没有原子的“查找和监听”命令。
即使从问题中删除增量并仅使用fullDocument=updateLookup
选项,仍然存在同步问题。 考虑以下方法:
我们需要首先设置监听,以便在查找时捕获任何事件。乍一看,这看起来不错。如果在查找过程中发生了某些事情,并且在流中有一些更改事件,则恢复时会处理这些事件。由于在查找之前启动了监听,最坏的情况是更改流包含在查找之前发生的一些更改,可能会导致保存旧状态。但这通常并不重要,因为更改流还将包含最近的更新,并最终保存正确的状态。
但是,监听是否实际在查找之前在 Mongo 集群上运行?
查找查询是否甚至已经到达了正确同步的节点?
由于统一拓扑和连接池的概念,我认为我们不可能知道这些问题的答案。 我们不知道上述命令行将最终使用哪个连接。 如果 Mongo 离线,客户端将缓冲诸如 watch 和 find 的命令,然后在连接可用于连接池时释放它们。但是由于涉及多个连接(可能会在恰好的时刻抽喉),因此无法保证第 1 行中的 watch 命令会在第 3 行中的 find 命令之前到达 Mongo。 因此,我非常确定此答案在 99% 的情况下有效,但它并不是 100% 可靠。
您可以尝试通过检查resumeTokenChanged
事件来确认 changeStream 是否已连接,然后进行查找,但如果您的集群出现了某些节点 oplog 同步时间较长的问题,则仍可能“查找()”旧数据并进行过晚的监听。 这意味着有未处理的更新。
更新
我认为我们所能希望的最佳解决方案是结合以上所有内容,并根据所需的群集同步优雅期间添加延迟。
collection.watch()
:监听集合中文档的更改。resumeTokenChanged
事件,确保changeStream已更新到最新状态并已连接。resumeTokenChanged
事件,认为changeStream过期,意味着数据也可能过期。changeStream.pause()
:暂停从changeStream中接收更改事件。change
事件,则执行await collection.find().toArray()
操作。changeStream.resume()
:恢复从changeStream中接收更改事件。error
事件,则重新开始整个过程。