如何在 @KafkaListener 中使用 @Transactional?

7

是否有可能在使用@KafkaListener注释的方法中使用声明式事务管理(通过@Transactional)?我想使用它来定义每个监听器的单独事务超时时间。我的设置如下:

事务管理器:

@Bean
@ConditionalOnBean(value = {HibernateTransactionManager.class})
public ChainedKafkaTransactionManager<Object, Object> chainedHibernateTm(KafkaTransactionManager<String, String> kafkaTransactionManager,
                                                                                 org.springframework.orm.hibernate5.HibernateTransactionManager hibernateTransactionManager) {
  return new ChainedKafkaTransactionManager<>(
    kafkaTransactionManager, 
    hibernateTransactionManager);
}

Kafka监听器:

@KafkaListener(topic = "my_topic")
@Transactional(timeout = 5)
public void handleMessage(SomeMessage message){
}

问题在于 - KafkaMessageListenerContainer在调用方法之前会创建自己的事务 - 它使用自己的TransactionTemplate:
@Nullable
private TransactionTemplate determineTransactionTemplate() {
  return this.transactionManager != null
    ? new TransactionTemplate(this.transactionManager)
    : null;
}

TransactionInterceptor 没有使用。那么如何为具体的 @KafkaListener 方法设置特定的事务超时时间?

2个回答

3
通过简单的@Transactional事务管理方面在Kafka监听器中无法工作,因为没有内置交互。有一个专门的章节介绍如何在事务中使用应用程序事件:Transaction Bound Event,使用注释@TransactionalEventListener可以帮助确保事务已成功完成。

您可以使用@EventListener注释注册常规事件侦听器。如果需要将其绑定到事务,请使用@TransactionalEventListener。这样做后,默认情况下侦听器会绑定到事务的提交阶段。


2
可以做到,但有些复杂,因为您需要将已使用的offset(s)发送到Kafka事务。
您可以使用KafkaTransactionManager来替代ChainedKafkaTransactionManager,并在HibernateTransactionManager中使用@Transactional。这样做会得到类似的结果,因为 Hibernate TX 将在 Kafka 事务之前提交(如果 Hibernate 提交失败,则 Kafka TX 将回滚)。 编辑 要为每个侦听器容器配置不同的链接TM,可以执行以下操作。
@Component
class ContainerFactoryCustomizer {

    ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory,
            ChainedKafkaTransactionManager<?, ?> chainedOne,
            ChainedKafkaTransactionManager<?, ?> chainedTwo) {
        factory.setContainerCustomizer(
                container -> {
                    String groupId = container.getContainerProperties().getGroupId();
                    if (groupId.equals("foo")) {
                        container.getContainerProperties().setTransactionManager(chainedOne);
                    }
                    else {
                        container.getContainerProperties().setTransactionManager(chainedTwo);
                    }
                });
    }

}

每个链接的TM都有一个不同默认超时时间的Hibernate TM。

groupid是从@KafkaListeneridgroupId属性填充的。


谷瑞尔。谢谢。但是,您能否提供更详细的建议,如何为每个KafkaListener实现自定义tx超时?我仍然希望与hibernate tx管理器同步。您说可以做到这一点,但我不知道如何做。 - Łukasz Torzyński
如果我选择第一个选项(不使用链接的tm),那么我是否需要手动将已消耗的offset发送到kafka事务? 不需要;容器对DB tx毫不知情,并且将发送offsets(如果DB TX提交)。 如果DB提交失败,则Kafka TX将回滚(并且回滚后处理器将重新查找分区)。 注释不是“完全被忽略”,默认传播是“REQUIRED”,拦截器会看到存在现有事务,因此无需启动另一个事务。仅在容器不具有事务性时才需要自己发送offsets。 - Gary Russell
更新:现在它能正常工作了,问题在于我正在使用isolation-level=read_committed在控制台消费者中读取已发送的消息。但是如果@Transactional在生产者端使用Hibernate TM而不是ChainedTM,这样做是否可以?看起来它能正常工作。 - Łukasz Torzyński
是的,请查看我在此GitHub问题上的评论(https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/869#issuecomment-604682229)。该模板将Kafka事务与现有的Hibernate事务同步,并在Hibernate事务之后提交。 - Gary Russell
1
正确;但对于仅生产者的事务,使用事务同步更简单。 - Gary Russell
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接