在CQRS读取端处理乱序事件

18

我读了Jonathan Oliver关于处理乱序事件的好文章。

http://blog.jonathanoliver.com/cqrs-out-of-sequence-messages-and-read-models/

我们使用的解决方案是将消息出队并将其放入“持有表”中,直到收到所有具有先前顺序的消息为止。当所有先前的消息都已收到时,我们将从保持表中取出所有消息,并按顺序通过适当的处理程序运行它们。一旦所有处理程序都被成功执行,我们就会从保持表中删除消息并提交更新以更新读模型。

对我们来说,这有效,因为域发布事件并标记它们具有适当的顺序号。如果没有这个,下面的解决方法将更加困难,甚至不可能。

该解决方案使用关系数据库作为持久性存储机制,但我们没有使用存储引擎的任何关系方面。同时,这其中有一个注意事项。如果消息2、3和4到达,但消息1从未到达,我们不会应用它们。这种情况只应在处理消息1时发生错误或消息1被某种方式丢失时发生。幸运的是,很容易通过修正消息处理程序中的任何错误并重新运行消息来纠正任何错误。或者,在消息丢失的情况下,可以直接从事件存储库重新构建读模型。

我有几个问题,特别是关于他如何说我们可以随时向事件存储库请求缺少的事件。

  1. 在CQRS的写侧是否必须公开一个服务供读取侧“要求”重放事件?例如,如果未收到事件1但已收到2、4、3,我们是否可以通过服务向事件存储库请求重新发布事件,从1开始?
  2. 这项服务是CQRS架构中的写入端责任吗?
  3. 我们如何使用它重新构建读模型?

我们使用了RabbitMq的“重试”方法,效果很好。如果经过多次重试仍无法解决问题,您只需将此事件放入死信队列并重置序列号,以便进一步处理事件。通常,在您的应用程序中,事件顺序错乱的原因是什么? - IlliakaillI
我有一些特定的命令,可以生成多个事件。我还没有实现任何东西,但是担心可能会出现乱序事件。我的事件发布者也是异步工作的。因此,有可能某些事件的发布顺序也不正确。我依靠我的事件序列号来帮助我重新组合它们。我将尝试重试方法。如果您能详细说明一下,我可以将其标记为答案。 - Dasith Wijes
我在我的回答评论部分添加了更详细的解释。 - IlliakaillI
3个回答

5
如果您有一个序列号,那么您就可以检测到当前事件是否顺序错误,例如currentEventNumber != lastReceivedEventNumber + 1。
一旦您检测到这种情况,您就可以抛出异常。如果您的订阅者有“重试”的机制,它会在一秒钟左右再次尝试处理此事件。在此期间,较早的事件很可能已经被处理,并且序列将是正确的。如果乱序事件很少发生,这就是一个解决方案。
如果您经常面临这种情况,您需要实现全局锁定机制,以允许某些事件按顺序处理。例如,我们在MSSQL中使用sp_getapplock来实现特定场景下的全局“关键部分”行为。Apache ZooKeeper提供了一个框架来应对更复杂的情况,当分布式应用程序的多个部分需要比简单锁更复杂的东西时。

我正在研究某些多人游戏如何处理这种情况。该游戏具有内置缓存,持续约100毫秒。它在应用事件之前等待100毫秒,以防有先前的事件丢失。由于可能存在潜在的可扩展性问题,我有点犹豫使用任何锁。顺便问一下,在您的实现中,您如何向事件存储请求缺失的事件? - Dasith Wijes
1
如果您想使此系统更加健壮,就必须非常小心地处理聚合端的各种缓存。如果您的应用程序突然失败会发生什么?根据我的经验,使用缓存事件的方法无法扩展。在我们的业务案例中,我们正在构建分布式24/7容错服务器,这意味着您必须在不同的物理机器上至少拥有2个聚合器进程实例。如果您想避免出现脑裂情况,您必须考虑同时运行3个独立实例。 - IlliakaillI
1
你在实现中如何向事件存储请求遗失的事件? 我们之前使用了RabbitMQ,后来转到了Azure Service Bus。这两个服务均提供交付保证功能。基本上,在聚合器端事务结束时,您会让队列服务知道您的事件已经成功处理。 - IlliakaillI
2
使用手动确认模式来接收消息。如果您的序列号无效,只需抛出异常,这意味着不会发生确认,这意味着消息将被重新传递。您可以通过在客户端实现中捕获-休眠-重新抛出未处理的异常来调整延迟。这是确认发生的方式在dotnet中。屏幕截图来自此教程页面 - IlliakaillI
1
语句 currentEventNumber != lastReceivedEventNumber + 1; 并不完全正确。考虑这样一种情况,即当 lastReceivedEventNumber = 5,currentEventNumber = 8 时,这是完全有效的情况。这是否绝对意味着版本为6和7的事件尚未到达?它们实际上应该到达吗?并不是这样的。 - Cristian E.
显示剩余2条评论

1
基于时间戳的解决方案:

传入的消息为:

{
 id: 1,
 timestamp: T2,
 name: Samuel
}
{
 id: 1,
 timestamp: T1,
 name: Sam,
 age: 26
}
{
 id: 1,
 timestamp: T3,
 name: Marlon Samuels,
 contact: 123
}

无论在数据库中的顺序如何,我们希望看到的是:

{
 id: 1,
 timestamp: T3,
 name: Marlon Samuels,
 age: 26,
 contact: 123
}

对于每个传入的消息,请执行以下操作:
  1. 获取已保存的记录并评估时间戳。
  2. 哪个时间戳更大,就选哪个为目标。

现在让我们来看看消息:

  1. T2 先到达:将其存储在数据库中,因为它是第一个。
  2. T1 接下来到达:持久化的(T2)和接收的(T1),所以 T2 是目标。
  3. T3 到达:持久化的(T2)和接收的(T1),所以 T3 是目标。

以下的 deepMerge(src, target) 应该能够给我们结果:

public static JsonObject deepMerge(JsonObject source, JsonObject target) {
    for (String key: source.keySet()) {
        JsonElement srcValue = source.get(key);
        if (!target.has(key)) { // add only when target doesn't have it already
            target.add(key, srcValue);
        } else {
            // handle recursively according to the requirement

        }
    }
    return target;
}

如果您需要完整版本的deepMerge(),请在评论中告诉我。


0

另一种选择是将从服务(S1)读取事件的方式进行调整,使其只能向您的服务(S2)生成有序事件。

例如,如果有许多不同会话的事件要处理,可以在前端设置一个排序服务(O1)来负责排序。它确保每个会话只传递一个事件到(S1),并且只有当(S1)和(S2)都成功处理了该事件时,(O1)才允许该会话的新事件通过(S1)。为了提高性能,还可以加入一些排队机制。


考虑一下,前端服务(O2)只需为传递的事件打上该会话的版本标记,然后下游服务就有了完成每个会话/版本对所需的一切。无论在远端的任何可能关心版本的地方,都可以确保它使用最新的版本。 - andrew pate
你所描述的情况更适合使用Actor模型设计模式来处理。Service Fabric和Akka.net都有可以完成这一任务的演员。 - Dasith Wijes
Dasith。我最近看了一下ServiceFabric,真的很喜欢它的可靠集合(微软做得很好),是的,我提到的会话可以作为Actor实现。虽然我不完全确定使用ServiceFabric,如果我在Actor上调用大量异步方法,那么这些任务的执行顺序是否总是保持有序。 - andrew pate

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接