事件溯源/ CQRS 读模型 - 投影

5
我有一个基于微服务的应用程序在AWS Lambda上运行。其中两个最重要的微服务使用事件溯源/cqrs。
背景: (这也是为了组织我的想法)
我正在使用这个库将事件存储在DynamoDB中,并将投影存储在AWS S3中。
写入部分非常完美: 每个命令调用从DynamoDB加载聚合的当前状态(通过处理程序运行事件和/或加载缓存的聚合),它根据一些业务逻辑决定接受或拒绝命令,然后使用KeyConditionExpression:'aggregateId =:a AND version >= :v'向DynamoDB写入,其中版本是已处理该聚合的事件计数。如果存在冲突,则写入失败。对我来说,这似乎是一个很好的系统!
然后,每个事件都会广播到SNS(主题名称是服务名称),以便其他服务可以根据需要对事件做出反应。
我真正困难的部分是读取。投影存储在S3中,并标记了每个事件源处理的最后一个commitId。当读取查询到来时,它会从S3中加载整个投影状态(对于所有聚合),查询所有更新的事件源,计算最新状态(再次针对所有聚合-如果它更加新,则将更新的对象写入S3),并根据查询参数返回状态的相关部分。
我的问题(之一):
我认为我的投影方法不对。大多数投影只按重要属性分组id,因此文件保持相对较小。但我还需要一种检索单个聚合的方法。使用投影似乎很疯狂,因为我每次都需要加载整个状态(即每个投影聚合),然后对其应用新事件,然后检索我想要的记录(它甚至可能没有改变)。
这是我现在正在做的事情,它的表现良好(<100k条记录),但我无法想象它会继续多久。
另一个问题是查询。我需要构建一个投影映射值,以匹配每个需要查询的属性的aggregateIds!肯定有更好的方法!
无论我如何思考这个问题,投影总是需要整个当前状态+任何新事件,才能返回一个未更改的记录。
2个回答

9
我觉得我在做投影方面出了问题。
我也这么认为,听起来你的查询和投影是耦合在一起的。
当一个读取查询进来时,它会从S3中加载所有聚合的完整投影状态,查询所有更新的事件源,计算最新的状态。
是的,听起来很混乱。更具体地说,查询似乎会触发投影执行的工作。
如果你能将查询与投影解耦,那么事情就会变得更容易。基本思想是你的查询不描述当前状态,而是描述上次投影运行时的状态。
同样的思路,不同的拼写:你从S3缓存的文档中回答查询。当检测到新事件时,你的投影运行,根据需要加载新数据,计算新文档,并替换缓存中的条目。
我想象一个三角形。
- 命令将信息从外部带入记录簿 - 投影将信息从记录簿带入缓存 - 查询将信息从缓存带到外部世界
每个三角形的边都异步运行。
我建议你从查询开始反向工作——你需要支持每个查询的文档是什么?你必须打败的延迟目标是什么?然后你开始权衡取舍——对于这个新查询,我是从现有文档中创建结果,还是需要用更细的粒度建立新文档?
如果我理解正确,我应该在事件到来时触发投影更新,而不是在查询时进行聚合。这样可以避免在每个查询上查询新事件。
是的,...事件只是一种触发方式;你也可以通过时钟(每15分钟检查一次是否需要更新)或由人工操作员决定(嗯,看起来你的账户余额过期了,让我尝试为你更新)来触发投影进程。有多种方法可以做到这一点,你可以混合和匹配策略。
我仍然需要加载整个状态,在更新投影和加载单个聚合时都是如此。
不一定。没有规定说你不能使用先前缓存的表示作为起点,然后只从记录簿中提取你需要的更改。
例如,假设您正在构建一个视图,它结合了聚合物A{id:7}B{id:9}。您获取了缓存的副本,并查看其元数据(在上一次写入时放置的位置),发现其中有一些类似于metadata:{A:{id:7, version:21}, B:{id:9, version:19}}的内容。现在,您只需要加载上次使用过的事件之后的所有事件,更新内存中的本地副本、更新元数据的本地副本并将所有内容推送到缓存即可。

非常感谢您的回答!如果我理解正确,我应该在事件到达时触发投影更新,而不是在查询时进行聚合。这样可以避免在每次查询时查询新事件的事件存储,但我仍然需要加载整个状态,无论是在更新投影时还是在加载单个聚合时。对吗?还是我漏掉了什么。 - joshblour

7

我不熟悉你的技术架构,但我实现投影的方式如下:

每个领域事件都有一个跨越所有聚合根的全局序列号。投影是具有任意名称和由该全局序列号表示的最后处理位置的读取模型。我可以随时添加新的投影以及其事件处理程序,并从0位置开始。我可以随时清除投影并将位置设置回0。我还可以结合添加一个新的投影来替换一个现有的投影,让它在建立时间长达数天后,然后删除旧的投影。

有一个服务会监视投影并使用事件存储几乎像队列一样。投影服务检查全局ID 当前位置之后的事件,并将其交给处理程序,然后更新位置。这是您的投影甚至可以过滤事件类型以提高性能的地方。

那就是基本思路。然后您的投影就是您查询的内容。一旦投影赶上了事件存储的“头”,事件就会被逐渐馈送到投影中。

如何将其转化为您的技术空间,我不太确定。如果您想获取一些想法,我正在进行一个名为Shuttle.Recall的C#实验。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接