有没有一种方法可以清除 Kafka 中的主题?

256

我将一条过大的消息推送到了本地机器上的kafka消息主题中,现在我遇到了一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加fetch.size在这里并不理想,因为我实际上不想接受那么大的消息。

27个回答

5

更新:此答案适用于Kafka 0.6。对于Kafka 0.8及更高版本,请参见@Patrick的答案。

是的,停止Kafka并手动删除相应子目录中的所有文件(在Kafka数据目录中很容易找到)。在Kafka重新启动后,主题将为空。


这需要关闭代理,最多只能算是一种hack。Steven Appleyard的回答确实是绝对最好的。 - Jeff Maass
1
@MaasSql 我同意。 :) 这个答案已经两年了,是关于版本0.6的。 "alter topic" 和 "delete topic" 功能后来已经实现了。 - Wildfire
Steven Appleyard的答案和这个一样不专业。 - Banjocat
1
让应用程序以支持的方式处理删除自己的数据,比关闭该应用程序并删除您认为是所有数据文件然后重新启动它要好得多。 - Nick

5

这里有很多好的答案,但是其中没有一个关于Docker的。我花了些时间才弄清楚,在这种情况下使用broker容器是错误的(显然!)

## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000

Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
        at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
        at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
        at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
        at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
        at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

根据我的组合文件,我应该使用zookeeper:2181而不是--zookeeper localhost:2181

## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000

正确的命令应该是:

docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000

希望这能节省某人的时间。

此外,请注意,消息不会立即被删除,而是在日志段关闭时才会发生。


你可以很好地执行到代理。问题在于localhost:2181... 例如,您误解了Docker的网络功能。此外,并非所有Zookeeper容器都具有kafka-topics,因此最好不要以这种方式使用它。最新的Kafka安装允许使用--bootstrap-servers来更改主题,而不是使用--zookeeper - OneCricketeer
1
然而,执行Zookeeper容器似乎是错误的。我的意思是,您可以从Kafka容器中使用--zookeeper zookeeper:2181。甚至可以从server.properties文件中grep出Zookeeper行。 - OneCricketeer
@cricket_007 嘿,谢谢你,我已经更正了答案,请告诉我那里还有什么问题。 - Vladimir Semashkin

4
除了更新保留时间(retention.ms)和保留字节数(retention.bytes)之外,我注意到主题清理策略应该是“删除”(默认值)。如果是“紧凑”,它将会保留消息更长时间。也就是说,如果是“紧凑”,您还必须指定删除保留时间(delete.retention.ms)。请参考此处获取更多信息。
$ ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
            
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

需要监测最早/最晚偏移量是否相同来确认操作已成功完成,可使用 du -h /tmp/kafka-logs/test-topic-3-100-* 检查。

$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'

26599762

$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'

26599762

另一个问题是,你需要首先获取当前配置,以便在删除成功后记得恢复: ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

4

Thomas的建议很好,但是不幸的是,旧版本的Zookeeper(例如3.3.6)中的zkCli似乎不支持rmr。例如比较现代Zookeeper3.3版的命令行实现。

如果你面对的是旧版本的Zookeeper,一个解决方案是使用诸如zc.zk这样的Python客户端库。对于不熟悉Python的人,需要使用pipeasy_install进行安装。然后启动Python shell (python),您可以执行以下操作:

import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic') 

或者甚至

zk.delete_recursive('brokers')

如果您想从Kafka中删除所有主题。

这会在代理上留下数据。您需要将此解决方案与类似 paramiko 的工具结合使用,以便SSH到每个代理并清理实际主题数据。 - OneCricketeer

4
最简单的方法是将单个日志文件的日期设置为早于保留期限。然后,代理应该在几秒钟内清理并删除它们。这样做有几个优点:
  1. 无需关闭代理,这是一个运行时操作。
  2. 避免了出现无效偏移异常的可能性(稍后会详细介绍)。
在我使用 Kafka 0.7.x 的经验中,删除日志文件并重新启动代理可能会导致某些消费者出现无效偏移异常。这是因为代理会将偏移量重置为零(在没有任何现有日志文件的情况下),并且之前从主题消费的消费者将重新连接以请求特定(曾经有效的)偏移量。如果此偏移量恰好超出了新主题日志的范围,则不会发生任何问题,消费者将在开始或结尾处恢复。但是,如果偏移量落在新主题日志的范围内,则代理会尝试获取消息集,但会因为偏移量与实际消息不对齐而失败。
可以通过在 zookeeper 中清除该主题的消费者偏移量来缓解这种情况。但是,如果您不需要一个全新的主题,只想删除现有内容,那么简单地“touch”一些主题日志比停止代理,删除主题日志和清除某些 zookeeper 节点更容易和可靠。

如何将“单个日志文件的日期设置为早于保留期限”?谢谢。 - bylijinnan

4
如果您想在Java应用程序中以编程方式执行此操作,可以使用AdminClient的API deleteRecords。使用AdminClient允许您在分区和偏移级别上删除记录。
根据JavaDocs,此操作由版本为0.11.0.0或更高版本的代理支持。
以下是一个简单的示例:
String brokers = "localhost:9092";
String topicName = "test";
TopicPartition topicPartition = new TopicPartition(topicName, 0);
RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset(5L);

Map<TopicPartition, RecordsToDelete> topicPartitionRecordToDelete = new HashMap<>();
topicPartitionRecordToDelete.put(topicPartition, recordsToDelete);

// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);

try {
  adminClient.deleteRecords(topicPartitionRecordToDelete).all().get();
} catch (InterruptedException e) {
  e.printStackTrace();
} catch (ExecutionException e) {
  e.printStackTrace();
} finally {
  adminClient.close();
}

3

在这个答案中,user644265建议临时降低主题的保留时间以解决问题。虽然这种解决方法仍然有效,但是最近的kafka-configs版本将警告--zookeeper选项已弃用:

警告:--zookeeper选项已弃用,并将在Kafka未来的版本中删除

请使用--bootstrap-server代替;例如:

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --add-config retention.ms=100

并且

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --delete-config retention.ms

2

你需要在配置文件中启用此功能。

echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties 
sudo systemctl stop kafka 
sudo systemctl start kafka 

清除主题

/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flows

创建主题
# /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic Test

请阅读该主题。

# /opt/kafka/bin/kafka-console-consumer.sh  localhost:9092 --topic flows --from-beginning

不完全清除,但效果相似! - Oak

2
./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic

您需要配置retention.ms。然后,您可以使用上述修改命令将其更改为1秒(稍后可以恢复默认值)。

Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000

2

使用新的AdminZkClient代替已弃用的AdminUtils来自Java:

最初的回答。

  public void reset() {
    try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
        5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {

      for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
        deleteTopic(entry.getKey(), zkClient);
      }
    }
  }

  private void deleteTopic(String topic, KafkaZkClient zkClient) {

    // skip Kafka internal topic
    if (topic.startsWith("__")) {
      return;
    }

    System.out.println("Resetting Topic: " + topic);
    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
    adminZkClient.deleteTopic(topic);

    // deletions are not instantaneous
    boolean success = false;
    int maxMs = 5_000;
    while (maxMs > 0 && !success) {
      try {
        maxMs -= 100;
        adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
        success = true;
      } catch (TopicExistsException ignored) {
      }
    }

    if (!success) {
      Assert.fail("failed to create " + topic);
    }
  }

  private Map<String, List<PartitionInfo>> listTopics() {
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
    props.put("group.id", "test-container-consumer-group");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Map<String, List<PartitionInfo>> topics = consumer.listTopics();
    consumer.close();

    return topics;
  }

你不需要Zookeeper。使用AdminClientKafkaAdminClient即可。 - OneCricketeer

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接