有没有一种方法可以清除 Kafka 中的主题?

256

我将一条过大的消息推送到了本地机器上的kafka消息主题中,现在我遇到了一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加fetch.size在这里并不理想,因为我实际上不想接受那么大的消息。

27个回答

2
使用您的应用程序组(GroupName应与应用程序kafka组名称相同),清除特定主题中的所有消息。 ./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group

这种方法存在问题(在0.8.1.1中测试过)。如果一个应用程序订阅了两个或多个主题:topic1和topic2,并且控制台消费者清理了topic1,不幸的是它也会删除topic2的无关消费者偏移量,导致重放topic2的所有消息。 - jsh
这不会清除/清理主题。此外,与“kafka-consumer-groups --reset-offsets”相比,这将花费太长时间。 - OneCricketeer

2

另一种手动清除主题的方法是:

在代理中:

  1. 停止Kafka代理
    sudo service kafka stop
  2. 删除所有分区日志文件(应在所有代理上完成)
    sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*

在Zookeeper中:

  1. 运行Zookeeper命令行界面
    sudo /usr/lib/zookeeper/bin/zkCli.sh
  2. 使用zkCli删除主题元数据
    rmr /brokers/topic/<some_topic_name>

再次在代理中:

  1. 重新启动代理服务
    sudo service kafka start

您需要停止并从每个具有副本的代理中删除文件,这意味着在执行此操作时可能会导致客户端停机。 - OneCricketeer
1
你说得对,这个方法只是让你实际看到Kafka中一些东西的存储和管理位置。但这种暴力方法绝对不适用于正在运行的生产系统。 - Danny Mor

1

我已经阅读了几乎所有的回答,我们正在使用 Kafka Kraft 3.4.0。所以我可以为 Kraft 添加一个答案。在 Kraft 上如何完成这个操作其实跟其他版本没有什么不同,你需要一台能够在其中使用 Kafka 引导服务器和 Kafka 二进制文件的机器,并执行以下命令:

bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name your-topic --alter --add-config retention.ms=1000

问题在于,Kafka 删除文件系统日志时并不仅仅考虑时间戳。还需要考虑日志段字节数。当磁盘上的日志大小达到分区的 segment.bytes 时,Kafka 会对日志进行卷绕,如果您还有未滚动的开放式分区偏移量,则即使将 retention.ms 设置为 1 毫秒,也不会将其删除。

如果你想要清除一个 topic,比如说每条消息都是 2000 字节;

设置 segment.bytes:

bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name your-topic --alter --add-config segment.bytes=<小于1条消息总字节数>

设置 retention.ms:

bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name your-topic --alter --add-config retention.ms=1000

请记住,它不会在1秒钟内神奇地清除,删除保留期应该在一秒钟内触发,但是开放段的滚动汇总需要更长时间(接近5分钟)。因此,请注意经纪人日志大小,并在您看到主题的日志大小为0时重置这些配置:

/bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name your-topic --delete-config segment.bytes
/bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name your-topic --delete-config retention.ms


0

我正在使用 Kafka 2.13 工具。现在,kafka-topics.sh 中的 --zookeeper 选项无法识别。要删除主题:

bin/kafka-topics.sh --bootstrap-server [kafka broker]:9092 --delete --topic [topic name]

请注意,如果您在已删除的主题中有大量数据,则可能需要等待一段时间才能再次创建相同的主题。当您尝试创建相同的主题时,您可能会收到以下错误信息:

ERROR org.apache.kafka.common.errors.TopicExistsException: Topic '[topic name]' is marked for deletion.


0
如果您正在使用confluentinc/cp-kafka容器,这是删除主题的命令。
docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>

成功响应:

Topic <topic-name> is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

-1

如果有人在2022年寻找更新的答案,我发现以下内容适用于Kafka版本3.3.1。这将更改“your-topic”的配置,以便消息保留1000毫秒。在消息被清除后,您可以将其设置回不同的值。

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name your-topic  --alter --add-config retention.ms=1000

这个相同的命令已经被提供了。 - OneCricketeer

-2

你有没有考虑让你的应用程序简单地使用一个重新命名的主题?(即,一个以原始主题命名但在末尾添加了“1”的主题)。

这样也会给你的应用程序带来一个新鲜干净的主题。


但这会留下垃圾供Kafka管理员处理,所有使用相同主题的其他客户端都需要进行更新。 - OneCricketeer
是的,生产者和消费者需要连接到新主题。通常,主题数据将过期(基于您的保留设置)并被清除,因此我认为Kafka管理员不需要在这里做任何工作。 - Andrew Norman
  1. 所有客户端都需要进行代码更改。在企业环境中,有多个客户端,这并不是真正可行的。
  2. 集群有一个主题限制(尽管数量在几千个左右)。应定期清除空置、废弃的主题。
  3. 创建新主题并不能真正回答问题。
- OneCricketeer
@OneCricketer 是的,在公共kafka有许多客户端消费的情况下,它是不起作用的,但在许多企业设置中,这绝对是一种有用的策略,其中kafka是私有的,主题的消费者由单个运营组管理。 OP是关于一个生活在本地机器上的kafka主题,我的解决方案绝对适用于该用例。 - Andrew Norman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接