当Kafka宕机时,我该如何处理IOException?

14
我正在尝试发布消息,但Apache Kafka已经宕机了。 像这样的紧急情况应该如何处理?
KafkaProducer::send()方法不会抛出任何可以处理的异常。生产者会将它们吞掉并记录错误,因此我会收到大量类似的错误消息,而且一切都会停滞,直到Kafka重新连接上。
2014-03-31 09:38:23.752 ERROR o.a.kafka.common.network.Selector - Error in I/O: 
java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_51]
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) ~[na:1.7.0_51]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:205) ~[kafka-clients-0.8.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212) [kafka-clients-0.8.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:150) [kafka-clients-0.8.1.jar:na]
    at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]

你有找到解决方案吗?这一直是我正在开发的审计应用程序的一个小负担。 - Kenny Cason
2个回答

5
在由Producer.send(...)返回的future上调用get(),或者如果您不想阻塞您的代码,则传递回调函数

try { producer.send(new ProducerRecord("mytopic", key, value)) .get(); // 阻塞直到确认 } catch(Exception e) { // 处理消息未被确认的情况 }


1
我确信这不是:回调函数和期货在2014年12月的“新”生产者客户端中已经被添加了。但现在我们有了答案! - Peter Davis

-2

我认为你可以做的一件事是在你的生产者代码中使用try/catch来记录所有Throwable


2
KafkaProducer会像pivovarit所说的那样吞噬错误。请参见https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L124。 - James Bloomer
或者查看 https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L276 获取精确的错误信息。 - James Bloomer

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接