在数据库和Kafka生产者之间同步事务

76
我们拥有一个微服务架构,使用Kafka作为服务之间的通信机制。一些服务有自己的数据库。假设用户调用A服务,应该在该服务的数据库中创建一条记录(或一组记录)。此外,这个事件应该作为Kafka主题上的项目报告给其他服务。如何确保只有在成功更新Kafka主题后才写入数据库记录(基本上在数据库更新和Kafka更新周围创建分布式事务)是最好的方法?
我们正在考虑在Spring Boot WebFlux服务中使用spring-kafka,我发现它有一个KafkaTransactionManager,但据我了解,这更多是关于Kafka事务本身的(确保Kafka生产者和消费者之间的一致性),而不是在两个系统之间同步事务(请参见这里:“Kafka不支持XA,你必须处理DB tx可能会提交而Kafka tx回滚的可能性。”)。此外,我认为这个类依赖于Spring的事务框架,至少就我目前的理解而言,这是线程绑定的,如果使用反应式方法(例如WebFlux),其中操作的不同部分可能在不同的线程上执行,那么它将无法工作。(我们正在使用reactive-pg-client,因此手动处理事务,而不是使用Spring的框架。)
我能想到的一些选项:
  1. 不要直接将数据写入数据库:只需将其写入Kafka。然后使用消费者(在服务A中)更新数据库。这似乎不是最有效的方法,并且会出现问题,因为用户调用的服务无法立即看到它应该创建的数据库更改。
  2. 不要直接写入Kafka:只需写入数据库,然后使用类似于Debezium的工具将更改报告给Kafka。问题在于更改基于单个数据库记录,而存储在Kafka中的业务重要事件可能涉及来自多个表的数据组合。
  3. 首先写入数据库(如果失败,则不执行任何操作并仅抛出异常)。然后,在写入Kafka时,假设写入可能失败。使用内置的自动重试功能尝试保持一段时间。如果最终完全失败,请尝试写入死信队列,并创建某种手动机制供管理员进行排序。如果写入DLQ失败(即Kafka完全关闭),则以其他方式记录它(例如,记录到数据库中),并再次创建某种手动机制供管理员进行排序。

有没有人对上述内容有任何想法或建议,或者能够纠正我上面的错误?

提前感谢!


任何事务管理都与“线程”相关联。如果它们在不同的线程上,就不可能将DB包含到Kafka TX中。您可以查看“ChainedKafkaTransactionManager”,但这仍然涉及Spring事务。此外,如果您的项目中一切都是反应式的,也可以查看Reactor Kafka:https://github.com/reactor/reactor-kafka - Artem Bilan
你可以在DB TX Manager中注册TransactionSynchronization。请参阅TransactionSynchronizationManager,尽管我不知道它如何帮助你,因为你担心的是反应式和非单线程执行... - Artem Bilan
非常感谢。我会查看那些类。使用reactive-pg-client时,启动事务的线程不一定是完成事务的线程,因此有关线程的问题。 - Yoni Gibbs
1
为了避免描述您不需要的解决方案:您真的有必要确保 Kafka 消息已发送才提交数据到数据库吗?还是仅需确保它将被发送(换句话说:您可以确信消息将在数据提交后不久出现在 Kafka 中)就足够了? - Jonas
2
感谢@Jonas。对我们来说,顺序并不重要。事情只需要是原子的:要么同时更新DB和Kafka,要么两者都不更新。我们采用了使用Debezium的CDC方法,但如果您有其他选项,那将会很有趣听到。 - Yoni Gibbs
显示剩余5条评论
5个回答

37

我建议使用稍微改进的第二种方法。

只写入您的数据库,但除了实际的表写入之外,还在同一数据库中的特殊表中写入“事件”记录;这些事件记录将包含您需要的聚合。最简单的方式是,您可以插入另一个实体,例如由JPA映射,其中包含聚合有效载荷的JSON属性。当然,这可以通过某种事务侦听器/框架组件自动化。

然后使用Debezium仅捕获来自该表的更改并将其流式传输到Kafka。这样你既有:在Kafka中最终一致的状态(Kafka中的事件可能落后或者在重新启动后可能会看到几个事件两次,但最终它们将反映数据库状态),又不需要分布式事务,并且具有业务级别的事件语义。

(免责声明:我是Debezium的负责人;有趣的是,我正在撰写一篇博客文章详细讨论这种方法)

以下是帖子链接:

https://debezium.io/blog/2018/09/20/materializing-aggregate-views-with-hibernate-and-debezium/

可靠的微服务数据交换 - Outbox模式


话虽如此,我对你们的事件的业务级语义很好奇,除了表示多个表的连接之外,还有什么其他的作用呢?也许你可以来我们的邮件列表并提供一些你使用情况的细节?我们希望更好地理解这些用例,并计划为它们提供更好的支持。谢谢! - Gunnar
1
谢谢Gunnar。有趣的是,我刚刚阅读了这篇文章,它建议使用CDC风格的方法(尽管没有您的“事件表”建议)。是的,你说得对,它基本上是基于多个表连接而成的数据,被视为单个业务重要实体,我们可能想要将其报告给Kafka。我们现在处于研发阶段,但一旦我们有更多细节,我们将加入您的邮件列表并在那里发布更多详细信息。再次感谢! - Yoni Gibbs
另外,我非常想阅读你即将发布的博客文章:如果准备好了,也许你可以在这里发布链接? - Yoni Gibbs
6
如果有人感兴趣,Gunnar的文章现在在这里 - Yoni Gibbs
3
我们在Debezium博客上发布了另一篇更通用的关于这种模式(“outbox pattern”)的文章一段时间前 - Gunnar

20

首先,我必须说我不是卡夫卡,也不是Spring专家,但我认为当写入独立资源时更多的是概念上的挑战,解决方案应该适应您的技术栈。此外,我应该说这个解决方案尝试着在没有外部组件(比如Debezium)的情况下解决问题,因为在选择这样的选项时,每个额外的组件都会在测试、维护和运行应用程序方面带来挑战,而这往往被低估。另外,并非每个数据库都可以用作Debezium源。

为了确保我们讨论的是相同的目标,让我们以简化的航空公司示例来澄清情况,客户可以购买机票。成功下单后,客户将收到由外部消息系统(我们必须与之通信的系统)发送的消息(邮件、推送通知等)。

在传统的JMS世界中,我们的数据库(存储订单)与JMS提供者之间有一个XA事务,在这种情况下会像下面这样:客户端将订单设置到我们的应用程序中,我们开始一个事务。应用程序将订单存储在其数据库中。然后将消息发送到JMS,您可以提交事务。即使它们正在与自己的资源交互,两个操作也都参与了事务。由于XA事务保证了ACID,我们没问题。

让我们考虑如何使用Kafka(或其他无法参与XA事务的资源)。由于不再有协调器来同步两个事务,因此以下内容的主要思想是将处理拆分为两个具有持久状态的部分。

当你将订单存储在数据库中时,你也可以将要发送到Kafka的消息(带有聚合数据)存储在同一数据库中(例如作为CLOB列中的JSON),使用相同的资源,ACID保证,一切都很好。现在,你需要一个机制来轮询“KafkaTasks”表以获取应发送到Kafka主题的新任务(例如使用定时器服务,在Spring中可能可以使用@Scheduled注释)。将消息成功发送到Kafka后,你可以删除任务条目。这确保了仅当订单也成功存储在应用程序数据库中时才发送消息到Kafka。我们是否实现了使用XA事务时所拥有的相同保证?不幸的是没有,因为仍然存在写入Kafka成功但删除任务失败的可能性。在这种情况下,重试机制(如你问题中提到的那个)将重新处理该任务并发送两次消息。如果你的业务场景对这种“至少一次”的保证感到满意,那么你就可以使用一个半复杂但易于实现为框架功能的解决方案,这样不必每个人都去烦恼细节。

如果需要“仅一次”功能,则不能将状态存储在应用程序数据库中(在这种情况下,“删除任务”是“状态”),而必须将其存储在Kafka中(假设在两个Kafka主题之间具有ACID保证)。例如:假设表中有100个任务(ID为1到100),任务作业处理前10个任务。您将Kafka消息写入其主题并向另一个带有ID 10的主题写入另一条消息。所有都在同一个Kafka事务中。在下一个周期中,您会消耗您的主题(值为10),并使用此值获取接下来的10个任务(并删除已处理的任务)。
如果有更简单(应用内)的解决方案具有相同的保证,则期待能够听到您的建议!
对不起回答有点长,但我希望这可以帮助您。

1
CDC是最好的方法,但当您不想要复杂性且事件数量巨大时,您可以通过让Async Kafka使用“full ack = all guarantees”来完成其工作,从而减少轮询(SPOF风险)大小,并在Kafka Publisher在其回调中以自己的时间响应(在超时内),您可以更新数据库列,指示Kafka消息已成功发布。假设JVM崩溃或消息未发布时仅有非常有限的数据丢失,那么“轮询数据以查找未发布的消息”的数量将显着减少。 - kisna
2
这是一个非常强大的解决方案。关于“仅一次”的语义:简而言之,我认为这不是一个问题。理想情况下,您的消费者已经在编码防御措施,以防止重复消息的可能性(根据kafka的文档,在可变网络条件下,重复可能会自然发生)。 - mattbrosenberg

12

上述所有方法都是解决问题的最佳方式,是定义清晰的模式。您可以在下面提供的链接中探索这些模式。

模式:事务性outbox

将事件或消息作为数据库事务的一部分发布,通过在数据库中保存它来保存在OUTBOX中。 http://microservices.io/patterns/data/transactional-outbox.html

模式:轮询发布者

通过轮询数据库中的outbox来发布消息。 http://microservices.io/patterns/data/polling-publisher.html

模式:事务日志尾随

通过尾随事务日志来发布对数据库所做的更改。 http://microservices.io/patterns/data/transaction-log-tailing.html


1
我建议使用一种新的2阶段消息方法。在这种新方法中,需要的代码量大大减少,而且不再需要Debezium。

https://betterprogramming.pub/an-alternative-to-outbox-pattern-7564562843ae

这种新方法需要做的是:
  1. 在编写数据库时,向辅助表中写入事件记录。
  2. DTM 提交一个2阶段消息。
  3. 编写一个服务来查询事件是否保存在辅助表中。
借助 DTM SDK 的帮助,在 Go 中可以用8行代码完成上述3个步骤,比其他解决方案少得多的代码量。
msg := dtmcli.NewMsg(DtmServer, gid).
  Add(busi.Busi+"/TransIn", &TransReq{Amount: 30})
err := msg.DoAndSubmitDB(busi.Busi+"/QueryPrepared", db, func(tx *sql.Tx) error {
    return AdjustBalance(tx, busi.TransOutUID, -req.Amount)
})

app.GET(BusiAPI+"/QueryPrepared", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
    return MustBarrierFromGin(c).QueryPrepared(db)
}))

您的每个起源选项都有其缺点:
  1. 用户无法立即看到其刚刚创建的数据库更改。
  2. Debezium将捕获数据库日志,这可能比您想要的事件要大得多。此外,部署和维护Debezium并不容易。
  3. “内置自动重试功能”并不便宜,可能需要大量代码或维护工作。

1
Debezium是一个可行的解决方案,但(就我的经验而言)它可能需要额外的开销来运行一个额外的pod,并确保该pod不会崩溃。这可能只是我在一些连续的情况下抱怨,其中pod OOM出错并且没有重新启动,网络规则部署丢失了一些消息,对aws aurora db的WAL访问开始表现异常...似乎所有可能出错的事情都发生了。并不是说Debezium不好,它非常稳定,但通常对于开发人员来说,运行它变成了一项网络技能而不是编码技能。
作为一个KISS解决方案,使用正常的编码解决方案将在99.99%的时间内工作(并告知您0.01%的情况):
- 开始事务 - 同步保存到数据库 - -> 如果失败,则退出。 - 异步发送消息到kafka。 - 阻塞直到主题报告已接收到消息。 - -> 如果超时或失败,则中止事务。 - -> 如果成功,则提交事务。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接