如何从Kafka服务器获取主题中的所有消息

15

我想从服务器上获取主题中自始至今的所有消息。

示例:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning

使用以上控制台命令时,我希望能够从主题中获取自始至今的所有消息,但是我无法使用Java代码消费主题中所有的消息。

4个回答

11

您可以使用以下命令获取所有消息:

cd Users/kv/kafka/bin

./kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic topicName --from-beginning --max-messages 100

9
最简单的方法是启动一个消费者并消耗所有消息。现在我不知道您的主题中有多少分区,以及您是否已经有一个现有的消费者组,但您有几个选项:
请查看此API:https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html 1)如果您已经在同一消费者组中拥有消费者,并且仍然想从头开始消费,则应使用API文档中列出的seek选项,并为组中的每个消费者将偏移量设置为0。这将从开头开始消费。
2)否则,您可以在新的消费者组中启动几个消费者,您不必担心寻找。
PS:如果您对Kafka有更多问题,请记得提供有关您设置的更多详细信息。很多事情都取决于您如何配置基础设施以及您希望它如何运行,因此会因情况而异。

对于一个新的消费者组,您还需要设置auto.offset.reset = earliest - Matthias J. Sax
此外,如果您在文档答案中明确指定Kafka版本,那就更好了。这是因为消费者偏移量的处理方式有很大的差异。在0.9之前,这是zookeeper的工作,但现在由kafka主题“__consumer_offset”处理。我发现许多人因版本之间的变化而感到困惑。 - Manav Garg
1
我同意 - 实际上,SO文档支持一个明确的“版本”字段,但当我编写内容时它被禁用了,所以我无法包含版本 :( 或许在主题被审核和接受后,我可以设置它。 - Matthias J. Sax
1
感谢@Manav我按照您的建议使用以下代码获取了所有消息。TopicPartition topicPartition = new TopicPartition(topic, 0); List<TopicPartition> partitions = Arrays.asList(topicPartition); consumer.assign(partitions);
consumer.seekToBeginning(partitions);
- gsc0441

8
TopicPartition topicPartition = new TopicPartition(topic, 0);
List<TopicPartition> partitions = Arrays.asList(topicPartition); 
consumer.assign(partitions);
consumer.seekToBeginning(partitions);

1

只需更改消费者组

ConsumerConfig.GROUP_ID_CONFIG - 更改为新的组ID

并设置

AUTO_OFFSET_RESET_CONFIG - earliest

示例代码-

    props.put(ConsumerConfig.GROUP_ID_CONFIG, "newID");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

这可能对开发和测试有帮助,但在生产环境中设置新的group_id并不是一个好的做法,对吧? - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接