Spring Boot Kafka 配置 DefaultErrorHandler?

4

我按照Spring Kafka文档创建了一个批量消费者:

@SpringBootApplication
public class ApplicationConsumer {
  private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationConsumer.class);
  private static final String TOPIC = "foo";

  public static void main(String[] args) {
    ConfigurableApplicationContext context = SpringApplication.run(ApplicationConsumer.class, args);
  }

  @Bean
  public RecordMessageConverter converter() {
    return new JsonMessageConverter();
  }

  @Bean
  public BatchMessagingMessageConverter batchConverter() {
    return new BatchMessagingMessageConverter(converter());
  }

  @KafkaListener(topics = TOPIC)
  public void listen(List<Name> ps) {
    LOGGER.info("received name beans: {}", Arrays.toString(ps.toArray()));
  }
}

我通过定义以下额外的配置环境变量,成功使消费者运行起来了。Spring会自动识别这些变量:

export SPRING_KAFKA_BOOTSTRAP-SERVERS=...
export SPRING_KAFKA_CONSUMER_GROUP-ID=...

所以上面的代码是有效的。但现在我想自定义默认错误处理程序,使用指数退避。从参考文档中,我尝试将以下内容添加到ApplicationConsumer类:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setCommonErrorHandler(new DefaultErrorHandler(new ExponentialBackOffWithMaxRetries(10)));
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    return props;
}

但现在我遇到了错误,说它找不到一些配置。看起来我被困在了必须重新定义consumerConfigs()中已经自动定义的所有属性的境地。这包括从引导服务器uri到json反序列化配置的所有内容。
有没有好的方法可以更新我的代码第一个版本,只覆盖默认错误处理程序?
1个回答

7

只需将错误处理程序定义为@Bean,Boot 就会自动将其连接到其自动配置的容器工厂中。

编辑

这对我来说运行得很好:

@SpringBootApplication
public class So70884203Application {

    public static void main(String[] args) {
        SpringApplication.run(So70884203Application.class, args);
    }

    @Bean
    DefaultErrorHandler eh() {
        return new DefaultErrorHandler((rec, ex) -> {
            System.out.println("Recovered: " + rec);
        }, new FixedBackOff(0L, 0L));
    }

    @KafkaListener(id = "so70884203", topics = "so70884203")
    void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("test");
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so70884203").partitions(1).replicas(1).build();
    }

}

foo
Recovered: ConsumerRecord(topic = so70884203, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1643316625291, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)

我在我的ApplicationConsumer中添加了以下内容,但在运行时似乎仍在使用FixedBackoff。你知道为什么吗?@Bean公共DefaultErrorHandler defaultErrorHandler(){return new DefaultErrorHandler(new ExponentialBackOffWithMaxRetries(10));} - shj
我刚刚测试了一下(再次),它按预期工作;请参见编辑。如果您无法弄清楚问题出在哪里,请在某个地方发布一个MCRE,以便我可以看到问题所在。 - Gary Russell
非常感谢你的帮助。问题最终是因为我在项目中使用了一个较旧版本的spring依赖项。更新后,注入开始工作了。 - shj
1
最好只指定引导程序版本,让其依赖管理决定其他 jar 的正确版本。 - Gary Russell

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接