有没有一种方法可以清除 Kafka 中的主题?

256

我将一条过大的消息推送到了本地机器上的kafka消息主题中,现在我遇到了一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加fetch.size在这里并不理想,因为我实际上不想接受那么大的消息。

27个回答

452

将主题的保留时间暂时更新为一秒钟:

kafka-topics.sh \
  --zookeeper <zkhost>:2181 \
  --alter \
  --topic <topic name> \
  --config retention.ms=1000

在较新的Kafka版本中,您还可以使用kafka-configs --entity-type topics来完成此操作。

kafka-configs.sh \
  --zookeeper <zkhost>:2181 \
  --entity-type topics \
  --alter \
  --entity-name <topic name> \
  --add-config retention.ms=1000

然后等待清除生效(持续时间取决于主题的大小)。一旦清除完成,恢复先前的retention.ms值。


16
好的,我会尽力为您翻译。请问您需要将以下内容翻译成中文吗?"That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?" - Greg Dubicki
38
我不确定如何检查当前配置,但我认为将其重置为默认值的命令应该是:bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms - aspergillusOryzae
19
或者根据版本:--delete-config retention.ms - aspergillusOryzae
64
自0.9.0版本以来,使用kafka-topics.sh修改配置已经被弃用。新的选择是使用kafka-configs.sh脚本。 例如:kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000 这还可以让您检查当前保留期,例如:kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name> - RHE
8
在2.8.0版本中,“--zookeeper”也已被弃用,最好使用引导服务器代替。 kafka-configs.sh --bootstrap-server <bstserver>:9091 --entity-type topics --alter --entity-name <topic name> --add-config retention.ms=1000 - Alisettar Huseynli
显示剩余10条评论

110

为清空队列,您可以删除该主题:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

然后重新创建它:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test

21
请记得在文件config/server.properties中添加行delete.topic.enable=true,因为所提到的命令打印的警告说:“注意:如果未将delete.topic.enable设置为true,则不会产生任何影响。” - Patrizio Bertoni
4
这并不总是即时的。有时它只会标记为删除,实际的删除会在稍后发生。 - Gaurav Khare
如果有人对这种方法感兴趣,请考虑使用被接受的答案。然而,这种方法也可以使用。但是,请记住,您还将失去分配给每个代理的分区。因此,当您重新创建主题时,根据集群的配置,您可能会遇到一些开销。另一个缺点是,如果您有活动消费者,并且auto.create.topics.enable设置为true,则可能会出现配置错误的主题。 - Ali Can

59

虽然被接受的答案是正确的,但该方法已被弃用。现在应通过 kafka-configs 进行主题配置。

kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic

通过这种方法设置的配置可以使用命令显示

kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic

4
值得一提的是:kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --delete-config retention.ms --entity-name MyTopic - NoBrainer
1
请注意:这需要一些时间才能生效(即使是只有1条消息的主题),而且删除顺序不能保证。 - dz902

52
以下是删除名为“ MyTopic”的主题的步骤:
  1. 描述主题,并记录代理ID
  2. 停止每个代理ID的Apache Kafka守护程序。
  3. 连接到每个代理(来自第1步),并删除主题数据文件夹,例如 rm -rf /tmp/kafka-logs/MyTopic-0。针对其他分区和所有副本重复此操作。
  4. 删除主题元数据:zkCli.sh 然后 rmr /brokers/MyTopic
  5. 启动每个已停止的机器的Apache Kafka守护程序

如果您错过了第3步,则Apache Kafka将继续报告该主题存在(例如,当您运行 kafka-list-topic.sh 时)。

已在Apache Kafka 0.8.0上测试。


3
在0.8.1版本中,使用命令./zookeeper-shell.sh localhost:2181./kafka-topics.sh --list --zookeeper localhost:2181来操作ZooKeeper和列出Kafka的主题列表。 - pdeschen
3
这会删除主题,但不会删除其中的数据。需要停止代理才能执行此操作。这种方法最多只能算是个权宜之计。Steven Appleyard 的回答确实是绝对最好的。 - Jeff Maass
1
这是写作当时唯一的方式。 - Thomas Bratt
2
在Kafka 0.8.2.1上对我有效,尽管zookeeper中的主题位于/brokers/topics/<topic name here>下。 - codecraig
1
这可能是从0.9版本开始出现的问题,因为偏移量在另一个主题中进行管理,因此那些使用先前偏移量的消费者可能会看到错误 - 尽管尚未尝试过。 - Guruprasad GV
显示剩余3条评论

45

在Kafka 0.8.2中进行测试,针对快速入门示例:首先,在config文件夹下的server.properties文件中添加一行:

delete.topic.enable=true

然后,你可以运行这个命令:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

然后重新创建它,以便客户端可以继续针对空主题进行操作。


19

可以使用以下命令删除Kafka主题中所有现有消息:

kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json

delete.json文件的结构应遵循以下格式:

{ "partitions": [ { "topic": "foo", "partition": 1, "offset": -1 } ], "version": 1 }

其中,offset为-1将删除所有记录。 (此命令已在kafka 2.0.1上进行了测试)


1
这不是最好的答案吗?它完全做到了所要求的。重新创建主题会因重新分区而产生副作用。更改保留期也不能保证删除所有内容。 - Zoomzoom
这是一个很好的评论,只是需要创建一个文件。接受的答案是错误的。 - undefined

14

从kafka 1.1开始

清除主题

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --add-config retention.ms=100

等待至少1分钟,确保Kafka清除了主题。删除配置,然后恢复默认值。

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --delete-config retention.ms

1
我认为你有一根多余的箭头。在我的电脑上,我能够运行bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config rentention.ms=100 - Will

8

Kafka没有直接清除/清理主题(队列)的方法,但可以通过删除该主题并重新创建来实现。

首先确保server.properties文件中存在delete.topic.enable=true,如果没有,则添加。

然后,删除主题:bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic

然后再次创建它。

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2

7

根据@Steven Appleyard的回答,我在Kafka 2.2.0上执行了以下命令,它们对我起作用。

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms

2
这似乎是重复了其他答案。 - OneCricketeer

5
有时,如果你的集群饱和(分区过多,或使用加密主题数据,或使用SSL,或控制器在错误的节点上,或连接不稳定),清除该主题将需要很长时间。
我遵循以下步骤,特别是当你正在使用TLS:
1:使用kafka工具运行:
kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>

2: 运行:

kafka-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning

3: 等主题为空后,将主题保留设置恢复为原始设置。

kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>

希望这能对某些人有所帮助,因为它并不容易被宣传。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接