我将一条过大的消息推送到了本地机器上的kafka消息主题中,现在我遇到了一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加fetch.size
在这里并不理想,因为我实际上不想接受那么大的消息。
我将一条过大的消息推送到了本地机器上的kafka消息主题中,现在我遇到了一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加fetch.size
在这里并不理想,因为我实际上不想接受那么大的消息。
更新:此答案适用于Kafka 0.6。对于Kafka 0.8及更高版本,请参见@Patrick的答案。
是的,停止Kafka并手动删除相应子目录中的所有文件(在Kafka数据目录中很容易找到)。在Kafka重新启动后,主题将为空。
这里有很多好的答案,但是其中没有一个关于Docker的。我花了些时间才弄清楚,在这种情况下使用broker容器是错误的(显然!)
## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
根据我的组合文件,我应该使用zookeeper:2181
而不是--zookeeper localhost:2181
## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
正确的命令应该是:
docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000
希望这能节省某人的时间。
此外,请注意,消息不会立即被删除,而是在日志段关闭时才会发生。
localhost:2181
... 例如,您误解了Docker的网络功能。此外,并非所有Zookeeper容器都具有kafka-topics
,因此最好不要以这种方式使用它。最新的Kafka安装允许使用--bootstrap-servers
来更改主题,而不是使用--zookeeper
。 - OneCricketeer--zookeeper zookeeper:2181
。甚至可以从server.properties文件中grep出Zookeeper行。 - OneCricketeer$ ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
需要监测最早/最晚偏移量是否相同来确认操作已成功完成,可使用 du -h /tmp/kafka-logs/test-topic-3-100-* 检查。
$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
26599762
$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
26599762
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Thomas的建议很好,但是不幸的是,旧版本的Zookeeper(例如3.3.6)中的zkCli
似乎不支持rmr
。例如比较现代Zookeeper和3.3版的命令行实现。
如果你面对的是旧版本的Zookeeper,一个解决方案是使用诸如zc.zk这样的Python客户端库。对于不熟悉Python的人,需要使用pip或easy_install进行安装。然后启动Python shell (python
),您可以执行以下操作:
import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')
或者甚至
zk.delete_recursive('brokers')
paramiko
的工具结合使用,以便SSH到每个代理并清理实际主题数据。 - OneCricketeerdeleteRecords
。使用AdminClient允许您在分区和偏移级别上删除记录。String brokers = "localhost:9092";
String topicName = "test";
TopicPartition topicPartition = new TopicPartition(topicName, 0);
RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset(5L);
Map<TopicPartition, RecordsToDelete> topicPartitionRecordToDelete = new HashMap<>();
topicPartitionRecordToDelete.put(topicPartition, recordsToDelete);
// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);
try {
adminClient.deleteRecords(topicPartitionRecordToDelete).all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
adminClient.close();
}
在这个答案中,user644265建议临时降低主题的保留时间以解决问题。虽然这种解决方法仍然有效,但是最近的kafka-configs
版本将警告--zookeeper
选项已弃用:
警告:--zookeeper选项已弃用,并将在Kafka未来的版本中删除
请使用--bootstrap-server
代替;例如:
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --add-config retention.ms=100
并且
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --delete-config retention.ms
你需要在配置文件中启用此功能。
echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties
sudo systemctl stop kafka
sudo systemctl start kafka
清除主题
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flows
# /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic Test
请阅读该主题。
# /opt/kafka/bin/kafka-console-consumer.sh localhost:9092 --topic flows --from-beginning
./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic
您需要配置retention.ms
。然后,您可以使用上述修改命令将其更改为1秒(稍后可以恢复默认值)。
Topic:myTopic PartitionCount:6 ReplicationFactor:1 Configs:retention.ms=86400000
使用新的AdminZkClient
代替已弃用的AdminUtils
来自Java:
最初的回答。
public void reset() {
try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {
for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
deleteTopic(entry.getKey(), zkClient);
}
}
}
private void deleteTopic(String topic, KafkaZkClient zkClient) {
// skip Kafka internal topic
if (topic.startsWith("__")) {
return;
}
System.out.println("Resetting Topic: " + topic);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.deleteTopic(topic);
// deletions are not instantaneous
boolean success = false;
int maxMs = 5_000;
while (maxMs > 0 && !success) {
try {
maxMs -= 100;
adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
success = true;
} catch (TopicExistsException ignored) {
}
}
if (!success) {
Assert.fail("failed to create " + topic);
}
}
private Map<String, List<PartitionInfo>> listTopics() {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
props.put("group.id", "test-container-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
consumer.close();
return topics;
}
AdminClient
或KafkaAdminClient
即可。 - OneCricketeer