使用Spring Kafka手动提交Kafka消费者的偏移量的方法

3
我正在尝试使用Spring-Kafka(1.1.0.RELEASE)手动提交Kafka消费者的偏移量。 我明白,为了实现强大的客户端,最好提交这些偏移量,以便其他消费者不会处理可能由已死的消费者或因触发重新平衡而最初处理的重复事件。

我看到有两种处理方式 -

  • Set the ACK_MODE to MANUAL_IMMEDIATE and in the listener implementation, invoke ack.acknowledge() API

    ConcurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    
    @KafkaListener(topics = "anotherBatchTopic", containerFactory = "batchContainerFactory")
    public void listen(List<ConsumerRecord<byte[], String>> batch, Acknowledgment acknowledgment) throws Exception {
        logger.info("===*** batch size : " + batch.size() + "***===");
        batch.forEach(System.out::println);
        acknowledgment.acknowledge();
    }
    
  • Implement a ConsumerRebalanceListener.

    ConcurrentKafkaListenerContainerFactory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {
    
    @Override
    public void onPartitionsRevoked(final Collection<TopicPartition> collection) {
    
    }
    
    @Override
    public void onPartitionsAssigned(final Collection<TopicPartition> collection) {
    
    }
    });
    
然而,采用这种方法后,我不知道如何获取对消费者的引用,以调用consumer.commitSync()consumer.commitASync() API。由于某些技术限制,我无法升级到支持ConsumerAwareRebalanceListener并具有对Consumer的引用的最新版本的spring-kafka。

那么,利用ConsumerRebalanceListener来能够提交偏移量到Kafka的方法是什么?
此外,我正在使用ConcurrentKafkaListenerContainerFactory.setConcurrency() API启动多个消费者监听线程,因此如果特定消费者线程失败,它是否具有自己的ConsumerRebalanceListener实例?

尽量避免使用内部Kafka提交。我会选择使用外部偏移处理选项,这样更安全。例如,我使用Zookeeper。在此Cloudera文章中可以了解更多详细信息:https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/ - dbustosp
1个回答

1

消费者重新平衡监听器不能用于提交偏移量;在1.x中,唯一的方法是通过Acknowledgment参数。

1.1.0非常古老;建议所有1.x用户升级到至少1.3.5;由于KIP-62的贡献,它具有更简单的线程模型——请参见项目页面

当前版本2.1.6(2.1.7将在今天发布)具有更多选项,包括ConsumerAwareMessageListener,您可以完全访问消费者。

在版本<1.3中,您无法直接从侦听器与消费者交互,因为消费者不是线程安全的,并且必须在不同的线程上调用侦听器。


如何在Kafka 2xx中实现? - Swapnil
请查看MANUAL_IMMEDIATE文档。如果您仍有疑问,请提出一个新问题并提供更多细节。 - Gary Russell
我已经提出了一个更详细的新问题。请在这里查看 https://stackoverflow.com/questions/60730281/spring-kafka-manual-acknowledgement - Swapnil

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接