我按照Spring Kafka文档创建了一个批量消费者:
@SpringBootApplication
public class ApplicationConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationConsumer.class);
private static final String TOPIC = "foo";
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(ApplicationConsumer.class, args);
}
@Bean
public RecordMessageConverter converter() {
return new JsonMessageConverter();
}
@Bean
public BatchMessagingMessageConverter batchConverter() {
return new BatchMessagingMessageConverter(converter());
}
@KafkaListener(topics = TOPIC)
public void listen(List<Name> ps) {
LOGGER.info("received name beans: {}", Arrays.toString(ps.toArray()));
}
}
我通过定义以下额外的配置环境变量,成功使消费者运行起来了。Spring会自动识别这些变量:
export SPRING_KAFKA_BOOTSTRAP-SERVERS=...
export SPRING_KAFKA_CONSUMER_GROUP-ID=...
所以上面的代码是有效的。但现在我想自定义默认错误处理程序,使用指数退避。从参考文档中,我尝试将以下内容添加到ApplicationConsumer类:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(new DefaultErrorHandler(new ExponentialBackOffWithMaxRetries(10)));
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
return props;
}
但现在我遇到了错误,说它找不到一些配置。看起来我被困在了必须重新定义consumerConfigs()中已经自动定义的所有属性的境地。这包括从引导服务器uri到json反序列化配置的所有内容。
有没有好的方法可以更新我的代码第一个版本,只覆盖默认错误处理程序?