我想从服务器上获取主题中自始至今的所有消息。
示例:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning
使用以上控制台命令时,我希望能够从主题中获取自始至今的所有消息,但是我无法使用Java代码消费主题中所有的消息。
我想从服务器上获取主题中自始至今的所有消息。
示例:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning
使用以上控制台命令时,我希望能够从主题中获取自始至今的所有消息,但是我无法使用Java代码消费主题中所有的消息。
您可以使用以下命令获取所有消息:
cd Users/kv/kafka/bin
./kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic topicName --from-beginning --max-messages 100
seek
选项,并为组中的每个消费者将偏移量设置为0。这将从开头开始消费。TopicPartition topicPartition = new TopicPartition(topic, 0);
List<TopicPartition> partitions = Arrays.asList(topicPartition);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
只需更改消费者组
ConsumerConfig.GROUP_ID_CONFIG - 更改为新的组ID
并设置
AUTO_OFFSET_RESET_CONFIG - earliest
示例代码-
props.put(ConsumerConfig.GROUP_ID_CONFIG, "newID");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
auto.offset.reset = earliest
。 - Matthias J. Saxconsumer.seekToBeginning(partitions); - gsc0441