Kafka消费者的poll()方法被阻塞了。

11

我刚开始使用Kafka 0.9并测试了一些特性,发现Java实现的消费者(KafkaConsumer)表现出一种奇怪的行为。

Kafka代理位于Ambari外部机器上。

虽然我能够实现生产者并开始向外部代理发送消息,但我不知道为什么当消费者尝试读取事件(poll)时,它会被卡住。

我知道生产者工作正常,因为我可以通过控制台消费者(在Ambari本地工作)来消费消息。但是当我执行Java消费者时,什么都没有发生,就被卡住了。调试代码后,我发现它被阻塞在poll()行:

    ConsumerRecords<String, String> records = consumer.poll(100);

顺便说一下,超时不起任何作用。无论您将其设置为0、100或1000毫秒,消费者都会在此行中被阻塞,不会超时也不会抛出异常。

我尝试了各种替代属性,例如advertised.host.nameadvertised.listener等等,但都没有成功。

非常感谢任何帮助。提前致谢!


是的,我是。我可以通过控制台消费者从托管Ambari的机器上消费消息。 - aran
你运行消费者的机器上呢?你尝试在控制台消费者上运行了吗? - David Griffin
2
你不需要在那里安装Zookeeper,只需在某个地方解压Kafka二进制文件即可。如果您想排除网络连接问题(防火墙等),那么您几乎必须这样做。否则,您无法排除这些问题。您的问题可能很简单,例如您的消费者由于防火墙问题而无法连接到Zookeeper实例。 - David Griffin
谢谢你的帮助,David。如果我有好消息,我会更新这个;) - aran
有没有办法调试这个问题?我遇到了一个消费者似乎没有消费的相同问题。 - Havnar
显示剩余3条评论
2个回答

8

可能的原因是运行您的消费者代码的机器无法连接到zookeeper。尝试在安装Kafka的机器上运行相同的消费者代码(我尝试过并且对我有效)。我还通过在server.properties文件中提及以下属性来解决了问题:

advertised.host.name="ip address which you want to expose"

// In my case, it is the public IP of the EC2 machine, I have kafka and zookeeper installed on the same EC2 machine.

advertised.port=9092

关于这个声明:
ConsumerRecords<String, String> records = consumer.poll(100);

上述语句并不意味着消费者会在100毫秒后超时;相反,它是轮询周期。在100毫秒内捕获的任何数据都将读入记录集合。


1
无论它在100毫秒内捕获了什么数据,都将被读取到记录集合中。这并不一定正确。超时是当没有立即可供消费的记录时轮询时间的上限。从 javadoc (KafkaConsumer.poll(Duration timeout)) 中可以看出:如果有记录可用,则此方法将立即返回。否则,它将等待传递的超时时间。 如果超时到期,将返回一个空记录集。请注意,此方法可能会阻塞超过超时以执行自定义{@link ConsumerRebalanceListener}回调。 - Emil Koutanov
在我的情况下,即使我使用(KafkaConsumer.poll(Duration timeout)),有时它会陷入无限循环并且无法退出。不确定如何使其更加健壮和稳定。有什么想法吗? - SymboCoder

1
在我的情况下,poll()方法最终卡在ensureCoordinatorReady()的无限循环中,Coordinator这个词告诉我协调器在另一台主机上运行。(为了测试目的,我只向/etc/hosts添加了一个broker主机,而总共有三个broker)。因此,消费者正确地获取了消费者协调器。
因此,解决方案出现了:在/etc/hosts文件中正确配置运行kafka broker的主机。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接