事件溯源 - Apache Kafka + Kafka Streams - 如何确保原子性/事务性。

3
我正在使用Apache Kafka Streams评估事件溯源,以查看它在复杂场景中的可行性。与关系型数据库一样,我遇到了一些必须保证原子性/事务性的情况:
一个带有两个服务的购物应用:
- OrderService:具有Kafka Streams存储订单(OrdersStore)。 - ProductService:具有Kafka Streams存储产品及其库存(ProductStockStore)。
流程如下:
1. OrderService发布OrderCreated事件(包含productId、orderId、userId信息)。 2. ProductService获取OrderCreated事件并查询其KafkaStreams Store(ProductStockStore),以检查该产品是否有库存。如果有库存,则发布OrderUpdated事件(也包含productId、orderId、userId信息)。 3. 此事件将被ProductService Kafka Stream监听,后者将对其进行处理以减少库存。
但是,假设以下情况:
1. 客户1下了一个名为order1的订单(该产品有1件库存)。 2. 同时,客户2下了另一个名为order2的订单,也是该产品(库存仍为1)。 3. ProductService处理order1并发送消息OrderUpdated以减少库存。此消息在来自order2的OrderCreated消息之后放置在主题中。 4. ProductService处理order2-OrderCreated并再次发送消息OrderUpdated以再次减少库存。这是不正确的,因为它会引入不一致性(库存现在应为0)。
明显的问题是,我们的物化视图(存储)应在处理第一个OrderUpdated事件时直接更新。但是,我所知道的唯一更新Kafka Stream Store的方法是发布另一个事件(OrderUpdated),以供Kafka Stream处理。这样,我们无法以事务方式执行此更新。
我希望能够得到处理此类情况的想法。
if("OrderPlaced".equals(event.get("eventType"))){

    Order order = new Order();
    order.setId((String)event.get("orderId"));
    order.setProductId((Integer)(event.get("productId")));
    order.setUid(event.get("uid").toString());

    // QUERY PRODUCTSTOCK TO CHECK AVAILABILITY
    Integer productStock = getProductStock(order.getProductId());

    if(productStock > 0) {
        Map<String, Object> event = new HashMap<>();
        event.put("name", "ProductReserved");
        event.put("orderId", order.getId());
        event.put("productId", order.getProductId());

        // WRITES A PRODUCT RESERVED EVENT TO orders topic
        orderProcessor.output().send(MessageBuilder.withPayload(event).build(), 500);
    }else{
        //XXX CANCEL ORDER
    }
}

ProductService 还有一个 Kafka Streams 处理器,负责更新库存:

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, "orders");
stream.xxx().yyy(() -> {...}, "ProductsStock");
Event1将首先被处理,由于仍有1个可用产品,因此它将生成ProductReserved事件。

现在轮到Event2了。如果ProductService Kafka Streams Processor处理Event1生成的ProductReserved事件之前,ProductService consumer消耗Event2,则消费者仍然会看到product1的产品库存为1,从而为Event2生成一个ProductReserved事件,进而在系统中产生不一致性。

2个回答

4

在任何分布式系统中,保证一致性都是个典型难题。通常不会选择强一致性,而是使用流程管理器/ saga模式。这与分布式事务的2PC(两阶段提交)相似,但是在应用程序代码中明确地实现。具体如下:

订单服务要求产品服务预留N个物品。如果产品服务接受命令并减少库存,则执行命令;否则拒绝命令。如果获得正面回复,则订单服务现在可以发出OrderCreated事件(虽然我会称之为OrderPlaced,因为“placed”更符合领域的惯用语言,“created”过于通用,但这只是细节)。产品服务可以监听OrderPlaced事件或显式发送ConfirmResevation命令。或者,如果发生了其他情况(例如无法清算资金),则可以发出适当的事件或显式发送CancelReservation命令到ProductService。为了应对特殊情况,ProductService还可以有一个调度程序(在KafkaStreams中,标点符号可以派上用场),以取消未在超时期间内得到确认或中止的预订。

两个服务的编排技术和处理错误条件和补偿操作(在本例中是取消预订)可以直接在服务中处理,也可以在专门的流程管理器组件中进行隔离处理。个人建议使用Kafka Streams Processor API实现一个明确的流程管理器。


感谢您的详细解释。如果我没错的话,Kafka Streams Processor API允许我们写入远程存储,对吧?因此,您建议使用另一个服务ProcessManagerService,该服务将使用Processor API与其他服务的存储进行交互,我的疑问是:所有由服务产生的消息都必须由该管理器处理,还是仅需要为Saga提交/回滚所必需的那些消息?如果您有更多相关阅读材料,那就太棒了。 - codependent
进程管理器不会直接与远程存储交互,而只需通过Kafka发送适当的消息,相应的服务将根据其合同处理它们。进程管理器可以是自己的服务,但不一定要是。它可以只是开始流程的服务的一部分,在这种情况下是订单服务。这取决于您更喜欢整体解决方案更简洁还是更解耦。 - Michal Borowiecki
嗨,Michal,经过一段时间的权衡,我仍然无法解决这个问题,所以我取消了答案的接受。让我来证明一下:我认为你使用Saga模式的建议是完全正确的,但实现还不够清晰。你说:“订单服务要求产品服务保留N个项目。如果有足够的库存,产品服务将接受命令并减少库存,否则拒绝命令。” 这是有问题的部分:ProductService必须检查库存以接受/拒绝命令。两个并发命令会检查库存... - codependent
如果您使用Kafka Streams来实现产品服务,就不会有这个问题。您只需要按产品ID对这些消息进行分区。分区内的消息处理是顺序的。所有预定相同产品的请求将发送到同一个任务中,并且它将按顺序处理消息并更新其状态存储。从Kafka v0.11开始,您甚至可以启用Exactly-once语义以在事务性基础上完成此操作,以避免由重试导致的重复预订。否则,您必须确保您的状态存储更新是幂等的。 - Michal Borowiecki
Michal感谢您抽出时间回答。更新后的问题中的最后一段让我对产品预订的原子性产生了怀疑。 - codependent
显示剩余2条评论

4
这个答案对于你最初的问题来说有点晚了,但为了完整性,我还是回答一下。解决这个问题的方法有很多种,但我建议以事件驱动的方式来解决。这意味着你需要在一个KStreams操作中验证是否有足够的库存来处理订单并将库存作为一个单一的预留。诀窍是通过productId重新分配密钥,这样你就知道相同产品的订单将在同一线程上顺序执行(这样你就不会陷入Order1和Order2两次预留同一产品库存的情况)。有一篇文章讨论了如何做到这一点: https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/
也许更有用的是,这里还有一些示例代码展示了如何实现: https://github.com/confluentinc/kafka-streams-examples/blob/1cbcaddd85457b39ee6e9050164dc619b08e9e7d/src/main/java/io/confluent/examples/streams/microservices/InventoryService.java#L76 请注意,在这个KStreams代码中,第一行重新以productId为键,然后使用转换器(Transformer)来(a)验证是否有足够的库存来处理订单,并且(b)通过更新状态存储来预留所需的库存。这是使用Kafka的事务功能进行原子操作的。

1
嗨Ben,谢谢你的回答,我终于有时间再次处理这个问题了。它运行得很好,但有一些我不理解的地方:KStream是从订单主题创建的,那么为什么选择一个新的键(产品)会改变处理到产品的元素的顺序呢?我的意思是原始来源是订单主题分区,现在怎么会重新分配到产品呢?有关此行为的任何文档吗? - codependent

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接