我正在尝试实现一个基于Spring Boot的Kafka消费者,它具有非常强的消息传递保证,即使出现错误也是如此。
- 必须按顺序处理来自分区的消息,
- 如果消息处理失败,则应暂停对特定分区的消耗,
- 应该使用退避重试处理,直到成功为止。
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
final ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
containerProperties.setErrorHandler(errorHandler());
return factory;
}
@Bean
public RetryTemplate retryTemplate() {
final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(1.5);
final RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(new AlwaysRetryPolicy());
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
public ErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler();
}
然而,在这里,记录被消费者永久性地锁定。在某个时刻,处理时间将超过max.poll.interval.ms
,服务器将重新分配分区给其他消费者,从而创建副本。
假设max.poll.interval.ms
等于5分钟(默认值),且故障持续30分钟,则会导致消息被处理约6次。
另一种可能性是在N次重试后(例如3次尝试)将消息返回到队列中,使用SimpleRetryPolicy
。然后,消息将被重新播放(感谢SeekToCurrentErrorHandler
),并且处理将从头开始,最多进行5次尝试。这将导致形成一系列延迟。
10 secs -> 30 secs -> 90 secs -> 10 secs -> 30 secs -> 90 secs -> ...
比持续上升的数字更不受欢迎 :)
是否有第三种情况可以保持延迟形成一个升序序列,同时又不在上述示例中创建重复项?