如何安全地取消订阅 Kafka 主题

3

我有一个简单的Java程序(已经Docker化),并且在Kubernetes(Pod)中部署。这个Java程序只是一个普通的Java项目,用于监听和消费特定的主题。例如:SAMPLE-SAFE-TOPIC。

我必须安全地取消订阅此主题,这意味着即使删除此Pod(Java消费者),也不会丢失任何数据。

以下是我搜索到的代码:

 public static void unsubscribeSafelyFromKafka() {  

  logger.debug("Safely unsubscribe to topic..");

  if (myKakfaConsumer != null) {
        myKafkaConsumer.unsubscribe();
        myKafkaConsumer.close();
     }
}

我需要通过命令行运行这段代码,其中Java程序已经存在一个静态的main方法。

我的问题是:

  1. 上面的代码是否保证没有记录会丢失?
  2. 当已经存在一个静态的main()时,如何通过命令行触发上述代码?

注意:我正在通过命令行运行Java项目。例如:java -jar MyKafkaConsumer.jar,因为这是要求。

请帮忙。


不确定您的意思是“Java程序已经存在静态main方法”。 - lazy.coder
嗨@lazy.coder,我有一个Java项目,它已经有了public static void main(String [] args),它监听并消费主题“SAMPLE-SAFE-TOPIC”,所以我想创建一个新方法,安全地取消订阅kafka,这样看起来就像我有2个static void main()但根据我的研究,它说不可能。 :(您有任何建议可以满足要求吗?谢谢您的回复。 - Yejin
为什么不创建一个新的方法来取消订阅主题,并从任何需要的地方调用该方法呢?为什么需要两个主要方法? - lazy.coder
嗨@lazy.coder,那正是我所做的。我正在通过命令行运行,例如“java -jar <jar文件>”。我尝试创建一个新类,其中包含取消订阅方法并执行“java -classpath <jar文件> <package.name.className>”,但它显示错误找不到主类。当我将其更改为main时,它也会抛出错误,因为我有两个mains。您是否有关于如何实现此目标的示例代码片段?提前致谢。 - Yejin
1个回答

2
如果我正确理解问题1,您担心通过控制台命令触发的一个线程取消订阅后,轮询消费者正在处理可能会丢失的一批记录,如果Pod被杀死,则存在风险?
如果您有其他的Pod作为同一消费者组的一部分进行消费,或者如果此Pod或任何Pod使用相同的组ID重新订阅,则最后提交的偏移量将确保不会丢失记录(尽管某些记录可能会被处理多次),因为接管的消费者将从那里开始。
如果您使用自动提交,那是最安全的,因为每个提交都在随后的轮询中发生,因此您不可能提交未处理的记录(只要您不产生额外的线程来处理)。手动提交则由您决定何时处理记录以及何时安全提交。
但是,在取消订阅后调用close是一个好主意,并应确保当前轮询批次的干净完成和最终偏移量的提交,只要所有这些都在超时期内完成即可。
关于问题2,如果您需要手动取消订阅,那么我认为您需要使用JMX或公开API等方式调用运行的JVM上的方法。但是,如果您只是想确保Pod终止时的安全关闭,您可以在关闭钩子中取消订阅,或者根本不必担心,因为偏移量提交提供了安全保障。

您好,@Chris先生,我只是想确认一下这种方法,我需要在取消订阅kafka消费者主题时设置超时时间,对吗?谢谢。 - Yejin
嗨Olah - 是的,如果您想要取消订阅并等待当前批次处理完毕,那么调用close()将等待30秒,或者有一个重载版本的close,您可以指定超时时间-https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close()。一般来说,如果您可以处理重复项,则Kafka消费者代码最简单-如果您可以处理重复项,则无需担心Pod停止和启动,Kafka消费的这个方面不会丢失任何消息,特别是如果您使用自动提交。 - Chris

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接