消费者未收到消息,kafka控制台,新的消费者API,Kafka 0.9

44

我正在进行Kafka 0.9.0.0的Kafka快速入门

我已经让Zookeeper在localhost:2181上监听了,因为我运行了:

bin/zookeeper-server-start.sh config/zookeeper.properties

我有一个单独的代理监听 localhost:9092,因为我执行了

bin/kafka-server-start.sh config/server.properties

我有一个生产者发布到主题"test",因为我运行了

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
yello
is this thing on?
let's try another
gimme more

当我运行旧版API消费者时,它通过运行 有效

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

然而,当我运行新的 API 消费者时,运行时没有输出任何内容。

bin/kafka-console-consumer.sh --new-consumer --topic test --from-beginning \
    --bootstrap-server localhost:9092

使用新的API,从控制台消费者订阅主题是否可行?我该如何解决?


你使用的是哪个Scala版本?你有编译Kafka吗?我在kafka_2.10-0.9.0.0.tgz上遇到了一些小问题,但是在kafka_2.101-0.9.0.0.tgz上,它完美地工作了,包括你的示例。 - vlain
好的,谢谢。这个是使用2.10版本的。如果我再尝试一次,会使用2.11版本。 - EthanP
你创建了“test”主题吗? - Hossein Vatani
16个回答

68

在我的 MAC 系统中,当我使用命令时,我遇到了 console-consumer 无法消费任何消息的同样问题。

kafka-console-consumer --bootstrap-server localhost:9095 --from-beginning --topic my-replicated-topic

但是当我尝试时,

kafka-console-consumer --bootstrap-server localhost:9095 --from-beginning --topic my-replicated-topic --partition 0

它愉快地列出了发送的消息。这是Kafka 1.10.11中的一个错误吗?


11
这个方法对我也适用,但为什么我们需要提到分区号呢? - user482963
2
在结尾处传递参数 --partition 0 后成功运行。 - Mohammad Faisal
3
我也在2.x版本中遇到了同样的问题。这个答案解决了它。 - santoshM
1
这只是一个临时解决方案,我建议验证Zookeeper和Kafka代理上的所有警告和错误日志。一旦集群健康,就可以尝试使用引导服务器进行读取。 - havelino
1
为什么这个解决方案有效?这是Kafka的一个错误吗? - Pardhu
显示剩余3条评论

12

我遇到了这个问题,解决方法是在zookeeper中删除/brokers并重新启动kafka节点。

bin/zookeeper-shell <zk-host>:2181

然后执行:

rmr /brokers

不确定为什么这样做可以解决问题。

当我启用调试日志时,我在消费者中一遍又一遍地看到这个错误信息:

2017-07-07 01:20:12 DEBUG AbstractCoordinator:548 - 发送 GroupCoordinator 请求到 broker xx.xx.xx.xx:9092 (id: 1007 rack: null),请求的组名为 test。 2017-07-07 01:20:12 DEBUG AbstractCoordinator:559 - 收到 GroupCoordinator 响应:ClientResponse(receivedTimeMs=1499390412231, latencyMs=84, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=13,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}),所属组为 test。 2017-07-07 01:20:12 DEBUG AbstractCoordinator:581 - 查找组 test 的协调器失败:协调器不可用。 2017-07-07 01:20:12 DEBUG AbstractCoordinator:215 - 组 test 的协调器发现失败,正在刷新元数据。


我正在使用CDH 5.13和CDK 4.0(apache 2.1)。遇到了同样的问题。然而,即使按照您的建议删除了broker和topic,消费者仍然无法接收消息。 - Prashant
这个解决方案对我很有效,但并不像简单地删除zookeeper上的所有引用那么容易。我已经删除了kafka代理上的日志文件,并确保集群健康,我对集群进行了几次修正,但我没有列出所有应用的修复列表。我的建议是访问zookeeper和kafka服务器日志,尝试删除所有错误和警告信息。 - havelino

9
这篇帖子中提到的解决方案对我有效- https://stackoverflow.com/a/51540528/7568227
请检查是否:
offsets.topic.replication.factor

(或者可能与复制相关的其他配置参数)不应大于经纪人数量。 这在我的情况下是问题所在。

此修复程序后,不再需要使用--partition 0。

否则,我建议按照所提到的线程中描述的调试过程进行操作。


1
嗨Miko,感谢您在StackOverflow上回答问题 :-) 由于原始线程的重要部分是您提到的调试过程,请将其转移到您的问题中。 - Philipp Maurer
@miko - 您先生,您是真正的英雄。 - RamPrakash

8

在我的情况下,这个不起作用。

kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

这个很有效

kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic --partition 0

由于主题__consumer_offsets位于无法访问的代理上。基本上,我忘记了复制它。重新定位__consumer_offsets解决了我的问题。


如何重新定位 __consumer_offsets 主题? - Hoang Minh Quang FX15045
我的问题也涉及到 __consumer_offsets 主题,当我将代理移动到不同的节点时,该主题仍会在旧节点中存储数据。我只需停止所有 Kafka 服务,使用 Zookeeper CLI 手动删除此主题,然后重新启动所有 Kafka 服务,现在错误已经消失了。 - Hoang Minh Quang FX15045

5

我在我的Mac上遇到了同样的问题。 我检查了日志,发现了以下错误。

Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). 
This error can be ignored if the cluster is starting up and not all brokers are up yet.

可以通过将复制因子更改为1来解决此问题。在server.properties中添加以下行并重新启动Kafka/Zookeeper即可。
offsets.topic.replication.factor=1

有没有任何线索可以在 Docker 中运行它时设置它?我正在遵循这个教程:https://kafka-tutorials.confluent.io/kafka-console-consumer-producer-basics/kafka.html,但是遇到了同样的问题。 - Neeraj Jain

3

我曾经遇到过同样的问题,现在我已经找到了解决方法。

当你使用 --zookeeper 时,应该将 zookeeper 地址作为参数提供。

当你使用 --bootstrap-server 时,应该将 broker 地址作为参数提供。


1
问题中已经提供了代理地址作为参数;9092端口是Kafka的默认端口。 - Jordan Pilat
1
好的,不是总是这样的,如果你下载了适用于Docker的HDP沙盒,它默认为6667。 - Loebre

2
您的 localhost 在这里是 foo。 如果将 localhost 一词替换为实际主机名,就可以正常工作。
就像这样:
生产者
./bin/kafka-console-producer.sh --broker-list \
sandbox-hdp.hortonworks.com:9092 --topic test

消费者:

./bin/kafka-console-consumer.sh --topic test --from-beginning \    
--bootstrap-server bin/kafka-console-consumer.sh --new-consumer \
--topic test --from-beginning \
--bootstrap-server localhost:9092

消费者的命令不正确。你可以请修正一下吗? - Pardhu

2
这个问题也会影响使用Flume从Kafka获取数据并将数据汇入HDFS。
为了解决上述问题:
  1. 停止Kafka brokers
  2. 连接到Zookeeper集群并删除/brokers z节点
  3. 重新启动Kafka brokers
我们使用的Kafka客户端版本和Scala版本没有任何问题。Zookeeper可能有关于broker主机的错误信息。
验证操作:
在Kafka中创建主题。
$ kafka-console-consumer --bootstrap-server slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --from-beginning

开启一个生产者通道并向其中发送一些消息。
$ kafka-console-producer --broker-list slavenode03.cdh.com:9092 --topic rkkrishnaa3210

打开一个消费者通道,从特定主题中消费消息。

$ kafka-console-consumer --bootstrap-server slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --from-beginning

要从Flume进行测试:

Flume代理配置:

rk.sources  = source1
rk.channels = channel1
rk.sinks = sink1

rk.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
rk.sources.source1.zookeeperConnect = ip-20-0-21-161.ec2.internal:2181
rk.sources.source1.topic = rkkrishnaa321
rk.sources.source1.groupId = flume1
rk.sources.source1.channels = channel1
rk.sources.source1.interceptors = i1
rk.sources.source1.interceptors.i1.type = timestamp
rk.sources.source1.kafka.consumer.timeout.ms = 100
rk.channels.channel1.type = memory
rk.channels.channel1.capacity = 10000
rk.channels.channel1.transactionCapacity = 1000
rk.sinks.sink1.type = hdfs
rk.sinks.sink1.hdfs.path = /user/ce_rk/kafka/%{topic}/%y-%m-%d
rk.sinks.sink1.hdfs.rollInterval = 5
rk.sinks.sink1.hdfs.rollSize = 0
rk.sinks.sink1.hdfs.rollCount = 0
rk.sinks.sink1.hdfs.fileType = DataStream
rk.sinks.sink1.channel = channel1

运行 Flume 代理:

flume-ng agent --conf . -f flume.conf -Dflume.root.logger=DEBUG,console -n rk

观察来自消费者的日志,确认主题中的消息已被写入HDFS。

18/02/16 05:21:14 INFO internals.AbstractCoordinator: Successfully joined group flume1 with generation 1
18/02/16 05:21:14 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [rkkrishnaa3210-0] for group flume1
18/02/16 05:21:14 INFO kafka.SourceRebalanceListener: topic rkkrishnaa3210 - partition 0 assigned.
18/02/16 05:21:14 INFO kafka.KafkaSource: Kafka source source1 started.
18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started
18/02/16 05:21:41 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
18/02/16 05:21:42 INFO hdfs.BucketWriter: Creating /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp
18/02/16 05:21:48 INFO hdfs.BucketWriter: Closing /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp
18/02/16 05:21:48 INFO hdfs.BucketWriter: Renaming /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp to /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920
18/02/16 05:21:48 INFO hdfs.HDFSEventSink: Writer callback called.

1
使用这个:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

注意:从您的命令中删除--new-consumer
参考链接:https://kafka.apache.org/quickstart

那么它将不会使用新的消费者,问题是如何使用新的消费者获取消息。 - Jordan Pilat
在Kafka的0.9.0.0版本中,他们的新控制台消费者无法正常工作,他们提供了Java消费者但没有控制台消费者。现在,他们已经完全从后续版本中删除了“--new-consumer”。 - Apurva Gupta
这个答案在我进行更正之后进行了编辑,所以请忽略我的先前评论。 - Jordan Pilat

0
在我的情况下,无论使用哪种方法都没有起作用,然后我还将日志级别增加到config/log4j.propertiesDEBUG,启动了控制台消费者。
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic MY_TOPIC

然后得到了以下日志

[2018-03-11 12:11:25,711] DEBUG [MetadataCache brokerId=10] Error while fetching metadata for MY_TOPIC-3: leader not available (kafka.server.MetadataCache)

这里的问题是我有两个kafka节点,但其中一个已经宕机了。由于某些原因,默认情况下,如果某个分区不可用(在这种情况下是分区3),kafka-console消费者将无法消费。但是,在我的应用程序中不会发生这种情况。

可能的解决方案包括:

  • 启动宕机的代理
  • 删除主题并重新创建,这样所有分区都将放置在在线代理节点上

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接