如何为ConcurrentKafkaListenerContainerFactory设置RecordInterceptor

3
我正在使用Spring Kafka 2.2.7,我已经配置了@EnableKafkakafkaListenerContainerFactory,并使用@KafkaListener来消费消息,一切都按预期运行。
我想添加一个RecordInterceptor来记录所有被消费的消息,但是我发现很难配置它。 文档指出可以在容器上设置RecordInterceptor,但我不知道如何获取容器的实例。
自版本2.2.7开始,可以向监听器容器中添加RecordInterceptor;在调用侦听器之前将调用它,以允许检查或修改记录。
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(createConsumerFactory());
        factory.setConcurrency(consumerCount);
        return factory;
    }

我已经查阅了Spring文档,但没有找到解决方案,这似乎是一件简单的事情,但也许我错过了什么。
如有任何帮助将不胜感激。
提前致谢。
1个回答

3

2.2.7版本开始,有一个名为setRecordInterceptor的方法可用。


该方法可以设置记录拦截器。
factory.setRecordInterceptor(new RecordInterceptor);

另外还有一条信息,RecordInterceptor 在批量监听器中不起作用。

从2.2.7版本开始,您可以将RecordInterceptor添加到监听器容器中,在调用监听器之前调用它以允许对记录进行检查或修改。如果拦截器返回null,则不会调用监听器。当监听器是批量监听器时,不会调用拦截器。


1
谢谢你指出来。我知道我错过了一些愚蠢的东西,但它甚至更愚蠢!当我检查工厂是否有这个方法时,我使用的是2.2.3版本,显然它不在那里。升级后忘记再次检查了。 - Asif
setRecordInterceptor在kafka消费者配置中是否像ConsumerConfig.MAX_POLL_RECORDS_CONFIG=1一样起作用? - Ali Yeganeh

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接