我将一条过大的消息推送到了本地机器上的kafka消息主题中,现在我遇到了一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加fetch.size
在这里并不理想,因为我实际上不想接受那么大的消息。
我将一条过大的消息推送到了本地机器上的kafka消息主题中,现在我遇到了一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加fetch.size
在这里并不理想,因为我实际上不想接受那么大的消息。
将主题的保留时间暂时更新为一秒钟:
kafka-topics.sh \
--zookeeper <zkhost>:2181 \
--alter \
--topic <topic name> \
--config retention.ms=1000
在较新的Kafka版本中,您还可以使用kafka-configs --entity-type topics
来完成此操作。
kafka-configs.sh \
--zookeeper <zkhost>:2181 \
--entity-type topics \
--alter \
--entity-name <topic name> \
--add-config retention.ms=1000
然后等待清除生效(持续时间取决于主题的大小)。一旦清除完成,恢复先前的retention.ms
值。
为清空队列,您可以删除该主题:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
然后重新创建它:
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test
config/server.properties
中添加行delete.topic.enable=true
,因为所提到的命令打印的警告说:“注意:如果未将delete.topic.enable设置为true,则不会产生任何影响。” - Patrizio Bertoniauto.create.topics.enable
设置为true
,则可能会出现配置错误的主题。 - Ali Can虽然被接受的答案是正确的,但该方法已被弃用。现在应通过 kafka-configs
进行主题配置。
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic
通过这种方法设置的配置可以使用命令显示
kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --delete-config retention.ms --entity-name MyTopic
。 - NoBrainerrm -rf /tmp/kafka-logs/MyTopic-0
。针对其他分区和所有副本重复此操作。zkCli.sh
然后 rmr /brokers/MyTopic
如果您错过了第3步,则Apache Kafka将继续报告该主题存在(例如,当您运行 kafka-list-topic.sh
时)。
已在Apache Kafka 0.8.0上测试。
./zookeeper-shell.sh localhost:2181
和./kafka-topics.sh --list --zookeeper localhost:2181
来操作ZooKeeper和列出Kafka的主题列表。 - pdeschen在Kafka 0.8.2中进行测试,针对快速入门示例:首先,在config文件夹下的server.properties文件中添加一行:
delete.topic.enable=true
然后,你可以运行这个命令:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
然后重新创建它,以便客户端可以继续针对空主题进行操作。
可以使用以下命令删除Kafka主题中所有现有消息:
kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json
delete.json文件的结构应遵循以下格式:
{ "partitions": [ { "topic": "foo", "partition": 1, "offset": -1 } ], "version": 1 }
其中,offset为-1将删除所有记录。 (此命令已在kafka 2.0.1上进行了测试)
从kafka 1.1开始
清除主题
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --add-config retention.ms=100
等待至少1分钟,确保Kafka清除了主题。删除配置,然后恢复默认值。
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --delete-config retention.ms
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config rentention.ms=100
。 - WillKafka没有直接清除/清理主题(队列)的方法,但可以通过删除该主题并重新创建来实现。
首先确保server.properties文件中存在delete.topic.enable=true
,如果没有,则添加。
然后,删除主题:bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
然后再次创建它。
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
根据@Steven Appleyard的回答,我在Kafka 2.2.0上执行了以下命令,它们对我起作用。
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms
kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
2: 运行:
kafka-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: 等主题为空后,将主题保留设置恢复为原始设置。
kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
。 - aspergillusOryzae--delete-config retention.ms
- aspergillusOryzaekafka-configs.sh --bootstrap-server <bstserver>:9091 --entity-type topics --alter --entity-name <topic name> --add-config retention.ms=1000
- Alisettar Huseynli