使用JPA和Kafka时的最佳实践

15

我目前参与一个项目,其中使用了JPA和Kafka。我正在尝试找到一组良好的实践方法来结合这些操作。

在现有的代码中,生产者与JPA在同一个事务中使用,然而,从我所阅读的内容来看,它们似乎并不共享一个事务。

@PostMapping
@Transactional
public XDto createX(@RequestBody XRequest request) {
    Xdto dto = xService.create(request);
    kafkaProducer.putToQueue(dto, Type.CREATE);
    return dto;
}

其中Kafka生产者的定义如下:

public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Type> template;

    public void putToQueue(Dto dto, Type eventType) {
        template.send("event", new Event(dto, eventType));
    }
}

将JPA和Kafka相结合,这是一个有效的用例吗?事务边界是否定义正确?


你是否正在尝试实现OLTP系统的变更数据捕获? - pushpavanthar
4个回答

8

当事务失败时,这种方法不能按预期工作。Kafka交互不是事务的一部分。

您可能需要查看TransactionalEventListener。您可能希望在AFTER_COMMIT事件上将消息写入kafka。即使如此,kafka发布也可能失败。

另一个选择是使用JPA将消息写入数据库,就像您正在做的那样。让debezium从数据库中读取更新后的数据并将其推送到kafka。事件将以不同的格式呈现,但更加丰富。


5
通过查看您的问题,我猜测您正在尝试实现 OLTP 系统的 CDC(Change Data Capture),即记录传输数据库中每个更改的日志。有两种方法可以解决这个问题。
  1. 应用程序代码对事务型数据库进行双重写入,并将数据发送到 Kafka。这种方法不一致,并且会影响性能。不一致是因为当您将数据写入两个独立系统时,如果其中一个写入失败,数据会出现错误,并且在事务流中将数据推送到 Kafka 会增加延迟,这是您不想妥协的。
  2. 从 DB 提交中提取更改(无论是数据库/应用程序级触发器还是事务日志),并将其发送到 Kafka。这种方法非常一致,并且不会对您的事务产生影响。之所以一致,是因为 DB 提交日志是成功提交后 DB 事务的反映。有很多解决方案可利用此方法,如 databusmaxwelldebezium 等。

如果 CDC 是您的用例,请尝试使用任何已经可用的解决方案。


3

正如其他人所说,您可以使用变更数据捕获(change data capture, CDC)来安全地将应用于数据库的更改传播到Apache Kafka。由于后者不支持任何类型的两阶段提交协议,因此您无法在单个事务中同时更新数据库和Kafka。

您可以对表本身进行CDC,或者如果您希望对发送到Kafka的结构有更多控制,则可以应用“outbox”模式。在这种情况下,您的应用程序将写入其实际的业务表以及一个包含要发送到Kafka的消息的“outbox”表。您可以在此博客文章中找到关于此方法的详细描述。

声明:我是这篇文章的作者,也是Debezium的主导者之一,Debezium是其他答案中提到的CDC解决方案之一。


0

在kafka中发送消息时,不应该将其放在事务中。如果需要在发送事件到kafka失败时回滚事务,则最好在此情况下使用spring-retry。只需将与发送事件到kafka相关的代码放在@Retryable注释方法中,并添加具有撤消之前对数据库所做更改逻辑的@Recover注释方法。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接