Kafka 消费者 WakeupException 处理 Java

8
如果我在一个线程中运行kafka消费者,并且不从外部进行操作,例如将消费者置于休眠或唤醒它,则有必要正确处理WakeupException吗?以及如何处理它的好方法是什么?
消费者正在Web服务上运行,用于不断地从队列中拉取数据,并且永远不应停止执行。此外,该服务没有空闲或挂起状态。在Kafka文档中指出,只有当kafka消费者被另一个线程阻塞时才会抛出该异常,但这种情况永远不会发生。 https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/errors/WakeupException.html Kafka版本0.10.0.0
catch (WakeupException e) {
    LOG.info("Kafka Consulmer wakeup exception");
    // Ignore exception if closing
    if (!closed.get()) {
        throw e;
    }
} finally {
    consumer.close();
}

问候,Rakesh

1个回答

8
您可以在Confluent文档中找到几个示例,展示了如何正确处理WakeupException [Confluent消费者文档]
基本上,如果您使用consumer.poll(Integer.MAX_VALUE),则消费者将一直阻塞,直到获取到消息。在这种情况下,如果您想要停止消费,可以调用consumer.wakeup()(从另一个线程)并捕获异常以正常关闭。
此外,在同步提交偏移量时,调用consumer.wakeup()将抛出一个WakeupException异常。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接