一个带有两个服务的购物应用:
- OrderService:具有Kafka Streams存储订单(OrdersStore)。 - ProductService:具有Kafka Streams存储产品及其库存(ProductStockStore)。
流程如下:
1. OrderService发布OrderCreated事件(包含productId、orderId、userId信息)。 2. ProductService获取OrderCreated事件并查询其KafkaStreams Store(ProductStockStore),以检查该产品是否有库存。如果有库存,则发布OrderUpdated事件(也包含productId、orderId、userId信息)。 3. 此事件将被ProductService Kafka Stream监听,后者将对其进行处理以减少库存。
但是,假设以下情况:
1. 客户1下了一个名为order1的订单(该产品有1件库存)。 2. 同时,客户2下了另一个名为order2的订单,也是该产品(库存仍为1)。 3. ProductService处理order1并发送消息OrderUpdated以减少库存。此消息在来自order2的OrderCreated消息之后放置在主题中。 4. ProductService处理order2-OrderCreated并再次发送消息OrderUpdated以再次减少库存。这是不正确的,因为它会引入不一致性(库存现在应为0)。
明显的问题是,我们的物化视图(存储)应在处理第一个OrderUpdated事件时直接更新。但是,我所知道的唯一更新Kafka Stream Store的方法是发布另一个事件(OrderUpdated),以供Kafka Stream处理。这样,我们无法以事务方式执行此更新。
我希望能够得到处理此类情况的想法。
if("OrderPlaced".equals(event.get("eventType"))){
Order order = new Order();
order.setId((String)event.get("orderId"));
order.setProductId((Integer)(event.get("productId")));
order.setUid(event.get("uid").toString());
// QUERY PRODUCTSTOCK TO CHECK AVAILABILITY
Integer productStock = getProductStock(order.getProductId());
if(productStock > 0) {
Map<String, Object> event = new HashMap<>();
event.put("name", "ProductReserved");
event.put("orderId", order.getId());
event.put("productId", order.getProductId());
// WRITES A PRODUCT RESERVED EVENT TO orders topic
orderProcessor.output().send(MessageBuilder.withPayload(event).build(), 500);
}else{
//XXX CANCEL ORDER
}
}
ProductService 还有一个 Kafka Streams 处理器,负责更新库存:
KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, "orders");
stream.xxx().yyy(() -> {...}, "ProductsStock");
Event1将首先被处理,由于仍有1个可用产品,因此它将生成ProductReserved事件。
现在轮到Event2了。如果ProductService Kafka Streams Processor处理Event1生成的ProductReserved事件之前,ProductService consumer消耗Event2,则消费者仍然会看到product1的产品库存为1,从而为Event2生成一个ProductReserved事件,进而在系统中产生不一致性。