我将一条过大的消息推送到了本地机器上的kafka消息主题中,现在我遇到了一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加fetch.size
在这里并不理想,因为我实际上不想接受那么大的消息。
我将一条过大的消息推送到了本地机器上的kafka消息主题中,现在我遇到了一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加fetch.size
在这里并不理想,因为我实际上不想接受那么大的消息。
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
另一种手动清除主题的方法是:
在代理中:
sudo service kafka stop
sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
在Zookeeper中:
sudo /usr/lib/zookeeper/bin/zkCli.sh
rmr /brokers/topic/<some_topic_name>
再次在代理中:
sudo service kafka start
我已经阅读了几乎所有的回答,我们正在使用 Kafka Kraft 3.4.0。所以我可以为 Kraft 添加一个答案。在 Kraft 上如何完成这个操作其实跟其他版本没有什么不同,你需要一台能够在其中使用 Kafka 引导服务器和 Kafka 二进制文件的机器,并执行以下命令:
bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name your-topic --alter --add-config retention.ms=1000
问题在于,Kafka 删除文件系统日志时并不仅仅考虑时间戳。还需要考虑日志段字节数。当磁盘上的日志大小达到分区的 segment.bytes 时,Kafka 会对日志进行卷绕,如果您还有未滚动的开放式分区偏移量,则即使将 retention.ms 设置为 1 毫秒,也不会将其删除。
如果你想要清除一个 topic,比如说每条消息都是 2000 字节;
设置 segment.bytes:
bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name your-topic --alter --add-config segment.bytes=<小于1条消息总字节数>
设置 retention.ms:
bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name your-topic --alter --add-config retention.ms=1000
请记住,它不会在1秒钟内神奇地清除,删除保留期应该在一秒钟内触发,但是开放段的滚动汇总需要更长时间(接近5分钟)。因此,请注意经纪人日志大小,并在您看到主题的日志大小为0时重置这些配置:
/bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name your-topic --delete-config segment.bytes
/bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name your-topic --delete-config retention.ms
我正在使用 Kafka 2.13 工具。现在,kafka-topics.sh 中的 --zookeeper 选项无法识别。要删除主题:
bin/kafka-topics.sh --bootstrap-server [kafka broker]:9092 --delete --topic [topic name]
ERROR org.apache.kafka.common.errors.TopicExistsException: Topic '[topic name]' is marked for deletion.
confluentinc/cp-kafka
容器,这是删除主题的命令。docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>
成功响应:
Topic <topic-name> is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
如果有人在2022年寻找更新的答案,我发现以下内容适用于Kafka版本3.3.1。这将更改“your-topic”的配置,以便消息保留1000毫秒。在消息被清除后,您可以将其设置回不同的值。
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name your-topic --alter --add-config retention.ms=1000
你有没有考虑让你的应用程序简单地使用一个重新命名的主题?(即,一个以原始主题命名但在末尾添加了“1”的主题)。
这样也会给你的应用程序带来一个新鲜干净的主题。