我有一个基于微服务的应用程序在AWS Lambda上运行。其中两个最重要的微服务使用事件溯源/cqrs。
背景: (这也是为了组织我的想法)
我正在使用这个库将事件存储在DynamoDB中,并将投影存储在AWS S3中。
写入部分非常完美: 每个命令调用从DynamoDB加载聚合的当前状态(通过处理程序运行事件和/或加载缓存的聚合),它根据一些业务逻辑决定接受或拒绝命令,然后使用
然后,每个事件都会广播到SNS(主题名称是服务名称),以便其他服务可以根据需要对事件做出反应。
我真正困难的部分是读取。投影存储在S3中,并标记了每个事件源处理的最后一个commitId。当读取查询到来时,它会从S3中加载整个投影状态(对于所有聚合),查询所有更新的事件源,计算最新状态(再次针对所有聚合-如果它更加新,则将更新的对象写入S3),并根据查询参数返回状态的相关部分。
我的问题(之一):
我认为我的投影方法不对。大多数投影只按重要属性分组id,因此文件保持相对较小。但我还需要一种检索单个聚合的方法。使用投影似乎很疯狂,因为我每次都需要加载整个状态(即每个投影聚合),然后对其应用新事件,然后检索我想要的记录(它甚至可能没有改变)。
这是我现在正在做的事情,它的表现良好(<100k条记录),但我无法想象它会继续多久。
另一个问题是查询。我需要构建一个投影映射值,以匹配每个需要查询的属性的aggregateIds!肯定有更好的方法!
无论我如何思考这个问题,投影总是需要整个当前状态+任何新事件,才能返回一个未更改的记录。
背景: (这也是为了组织我的想法)
我正在使用这个库将事件存储在DynamoDB中,并将投影存储在AWS S3中。
写入部分非常完美: 每个命令调用从DynamoDB加载聚合的当前状态(通过处理程序运行事件和/或加载缓存的聚合),它根据一些业务逻辑决定接受或拒绝命令,然后使用
KeyConditionExpression:'aggregateId =:a AND version >= :v'
向DynamoDB写入,其中版本是已处理该聚合的事件计数。如果存在冲突,则写入失败。对我来说,这似乎是一个很好的系统!然后,每个事件都会广播到SNS(主题名称是服务名称),以便其他服务可以根据需要对事件做出反应。
我真正困难的部分是读取。投影存储在S3中,并标记了每个事件源处理的最后一个commitId。当读取查询到来时,它会从S3中加载整个投影状态(对于所有聚合),查询所有更新的事件源,计算最新状态(再次针对所有聚合-如果它更加新,则将更新的对象写入S3),并根据查询参数返回状态的相关部分。
我的问题(之一):
我认为我的投影方法不对。大多数投影只按重要属性分组id,因此文件保持相对较小。但我还需要一种检索单个聚合的方法。使用投影似乎很疯狂,因为我每次都需要加载整个状态(即每个投影聚合),然后对其应用新事件,然后检索我想要的记录(它甚至可能没有改变)。
这是我现在正在做的事情,它的表现良好(<100k条记录),但我无法想象它会继续多久。
另一个问题是查询。我需要构建一个投影映射值,以匹配每个需要查询的属性的aggregateIds!肯定有更好的方法!
无论我如何思考这个问题,投影总是需要整个当前状态+任何新事件,才能返回一个未更改的记录。