我刚开始使用Kafka 0.9并测试了一些特性,发现Java实现的消费者(KafkaConsumer
)表现出一种奇怪的行为。
Kafka代理位于Ambari外部机器上。
虽然我能够实现生产者并开始向外部代理发送消息,但我不知道为什么当消费者尝试读取事件(poll)时,它会被卡住。
我知道生产者工作正常,因为我可以通过控制台消费者(在Ambari本地工作)来消费消息。但是当我执行Java消费者时,什么都没有发生,就被卡住了。调试代码后,我发现它被阻塞在poll()
行:
ConsumerRecords<String, String> records = consumer.poll(100);
顺便说一下,超时不起任何作用。无论您将其设置为0、100或1000毫秒,消费者都会在此行中被阻塞,不会超时也不会抛出异常。
我尝试了各种替代属性,例如advertised.host.name、advertised.listener等等,但都没有成功。
非常感谢任何帮助。提前致谢!