是否有可能在使用@KafkaListener注释的方法中使用声明式事务管理(通过@Transactional)?我想使用它来定义每个监听器的单独事务超时时间。我的设置如下:
事务管理器:
@Bean
@ConditionalOnBean(value = {HibernateTransactionManager.class})
public ChainedKafkaTransactionManager<Object, Object> chainedHibernateTm(KafkaTransactionManager<String, String> kafkaTransactionManager,
org.springframework.orm.hibernate5.HibernateTransactionManager hibernateTransactionManager) {
return new ChainedKafkaTransactionManager<>(
kafkaTransactionManager,
hibernateTransactionManager);
}
Kafka监听器:
@KafkaListener(topic = "my_topic")
@Transactional(timeout = 5)
public void handleMessage(SomeMessage message){
}
问题在于 - KafkaMessageListenerContainer在调用方法之前会创建自己的事务 - 它使用自己的TransactionTemplate:
@Nullable
private TransactionTemplate determineTransactionTemplate() {
return this.transactionManager != null
? new TransactionTemplate(this.transactionManager)
: null;
}
TransactionInterceptor 没有使用。那么如何为具体的 @KafkaListener 方法设置特定的事务超时时间?