CQRS,事件溯源和扩展性

7
很明显,基于这些模式的系统易于扩展。但我想问一下,具体是如何实现的呢?我有几个关于可扩展性的问题:
  1. 如何扩展聚合根?如果我创建多个聚合根 A实例,如何同步它们?如果其中一个实例处理命令并创建事件,则应将此事件传播到该聚合根的每个实例吗?
  2. 难道不应该存在一些业务逻辑来确定请求哪个聚合根实例吗?因此,如果我发出多个命令,这些命令都适用于聚合根A(订单)并适用于一个特定订单,则将其传递给同一个实例是有意义的。或者?
在这篇文章中:https://initiate.andela.com/event-sourcing-and-cqrs-a-look-at-kafka-e0c1b90d17d8,他们使用了带有分区的Kafka。因此,用户管理服务 - 聚合根被扩展,但仅订阅特定主题分区,该分区包含特定用户的所有事件。
谢谢!
3个回答

6
如何扩展聚合?
- 仔细选择聚合,确保您的命令在许多聚合之间合理分布。您不想有一个聚合,可能会从并发用户接收大量命令。 - 序列化发送到聚合实例的命令。这可以通过聚合存储库和命令总线/队列来完成。但对我而言,最简单的方法是使用聚合版本控制进行乐观锁定,如this post by Michiel Rook所述。
请求聚合的哪个实例?
在我们的reSolve framework中,我们在每个命令上创建聚合实例,并且不在请求之间保留它。这种方法非常快速 - 比在集群中查找正确的聚合实例要快得多,因为它更快地获取100个事件并将其减少到聚合状态。
这种方法是可扩展的,可以让您无服务器 - 每个命令一个lambda调用,没有共享状态。那些聚合具有太多事件的罕见情况可以通过快照解决。

它是可配置的,实际上取决于事件存储数据库和事件大小。但是100-200可能是一个合理的默认值。 - Roman Eremin
1
首先,不应该让来自并发用户的千条命令同时作用于同一个聚合物,应该将其缩小。 如果两个命令同时作用于同一个聚合物 - 第一个完成的 Lambda 将获胜,而第二个在尝试为相同的聚合版本保存事件时将会出现乐观异常。它所需要做的就是再次应用命令。Lambda 不必互相通信。 - Roman Eremin
我有一个问题想问。您是否在Lambda中,在命令处理程序内执行一些命令验证?以异步方式 - 通过消息传递?如何实现?您只是听取答案,然后再次调用Lambda并返回处理吗? - Ondrej Tomcik
1
在发送命令之前应该完成这个步骤。DDD 假设聚合应该能够验证所有业务规则,而不需要与其他聚合进行交流。这是一篇好的文章:http://danielwhittaker.me/2014/11/22/4-secrets-inter-aggregate-communication-event-sourced-system/ - Roman Eremin
最常见的例子是“如何确保用户名唯一?”社区共识是,在发送CREATE_USER命令之前,您应该在客户端(使用读模型)上检查此问题,并且如果某种方式两个具有相同用户名的用户成功注册,则应在构建用户列表时(在读取方面)捕获此问题并手动处理或使用saga(例如禁用第二个用户)。 - Roman Eremin
显示剩余5条评论

5
如何扩展聚合?
聚合实例由其事件流表示。每个聚合实例都有自己的事件流。来自一个聚合实例的事件不会被其他聚合实例使用。例如,如果具有ID = 1的订单聚合创建了ID = 1001的OrderWasCreated事件,则该事件永远不会用于重建其他订单聚合实例(ID = 2,3,4 ...)。
话虽如此,您可以通过基于聚合ID在事件存储上创建分片来水平扩展聚合。
如果我创建多个聚合A的实例,如何同步它们?如果其中一个实例处理命令并创建事件,则该事件应传播到该聚合的每个实例吗?
你不能。每个聚合实例完全与其他实例分离。
为了能够水平扩展命令的处理,建议每次从事件存储加载聚合实例,通过重放其先前生成的所有事件。有一种优化可以提高性能:聚合快照,但只有在确实需要时才建议这样做。 这个答案可能会有所帮助。

是否应该存在一些业务逻辑来决定请求哪个聚合实例?因此,如果我发出多个命令,这些命令适用于聚合A(订单),并且适用于一个特定的订单,则将其传递到同一实例是有意义的。对吗?

您假设聚合实例在某些服务器的RAM上持续运行。您可以这样做,但这样的架构非常复杂。例如,当其中一个服务器关闭并且必须由其他服务器替换时会发生什么?很难确定哪些实例在那里运行并重新启动它们。相反,您可以拥有许多无状态服务器,可以处理任何聚合实例的命令。当命令到达时,您可以通过重放所有先前事件从事件存储中加载聚合ID,然后可以执行该命令。在将新事件持久化到事件存储后执行命令,可以丢弃聚合实例。下一个到达相同聚合实例的命令可以由任何其他无状态服务器处理。因此,可扩展性仅受事件存储本身的可扩展性所限制。


你好,感谢您的回答。我现在有些困惑,不确定聚合 ID 的含义。聚合 ID 是定义聚合本身 - ORDERS?还是唯一项目 - 一个带有 ID 的订单...因为如果聚合 ID 是实体的 ID,那么我清楚地知道我只会加载属于那个特定订单的事件。在这种情况下,创建订单的1000个并行用户只是不同的1000个聚合体,对吗? - Ondrej Tomcik
还有一个问题,如果我需要在聚合中进行一些验证,那么对系统中的某个读模型进行验证是否有效?还是答案是否定的,我应该重新考虑我的边界。 - Ondrej Tomcik
2
@OndrejTomcik 是的,Aggregate的ID就是例如订单的ID。 - Constantin Galbenu
2
@OndrejTomcik 聚合体不得超出其边界,所以绝对不行。 - Constantin Galbenu

0
如何扩展聚合?
系统中的每个信息都有一个单一的逻辑权限。对于一个数据的多个权限会导致争用。通过创建更小的、不重叠的边界来扩展写入——每个权限都有更小的责任区域,从而扩展写入。
To borrow from your example, an example of smaller responsibilities would
be to shift from one aggregate for all ORDERS to one aggregate for _each_
ORDER.

It's analogous to the difference between having a key value store with
all ORDERS stored in a document under one key, vs each ORDER being stored
using its own key.

读取是安全的,您可以使用多个副本进行扩展。但是这些副本只是最终一致性的。这意味着如果您问“FCOJ的出价价格现在是多少?” 您可能会从每个副本获得不同的答案。或者,如果您问“FCOJ在10:09:02的出价价格是多少?” 那么每个副本将给您一个单一的答案或说“我还不知道”。

但是,如果粒度已经是每个聚合物一个命令,那在我看来并不常见,并且您确实有很多并发访问,该如何解决?如何尽可能地分散负载并保持无冲突状态?

大致草图-每个聚合物都通过可以从命令消息的内容计算出的密钥进行存储。使用该密钥执行比较和交换操作以实现对聚合物的更新。

Acquire a message
Compute the storage key
Load a versioned representation from storage
Compute a new versioned representation
Store.compare and swap the new representation for the old

为了提供额外的流量吞吐量,您可以添加更多无状态计算。
为了提供存储吞吐量,您可以将密钥分布在更多的存储设备上。
路由层可用于将消息分组 - 路由器使用与之前相同的存储密钥计算,但使用它来选择将消息转发到计算农场中的哪个位置。然后,计算可以检查它接收到的每批消息中是否有重复的密钥,并一起处理这些消息(交换一些额外的计算以减少比较和交换的数量)。
合理的消息协议非常重要;请参阅Marc de Graauw的Nobody Needs Reliable Messaging

当然,你可以通过创建较小且不重叠的边界来扩展写入操作。但是如果粒度已经是每个聚合一个命令(在我看来并不常见),而且你有非常多的并发访问,如何解决这个问题呢?如何尽可能地分散负载并避免冲突呢? - Ondrej Tomcik

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接