如何使用Spring在创建Kafka主题时配置保留策略?

15
我需要在创建主题时配置特定主题的保留策略。我尝试寻找解决方案,只能找到以下命令级别的修改命令:

./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config retention.ms=1680000

有人能告诉我如何在创建期间配置它,类似于Spring-MVC中的XML或属性文件配置吗?
3个回答

19

使用Spring Kafka,您可以通过在应用程序上下文中声明@Bean来创建新主题。这将需要在应用程序上下文中具有KafkaAdmin类型的bean,如果使用Spring Boot,则将自动创建该bean。您可以按如下方式定义您的主题:

@Bean
public NewTopic myTopic() {
    return TopicBuilder.name("my-topic")
            .partitions(4)
            .replicas(3)
            .config(TopicConfig.RETENTION_MS_CONFIG, "1680000")
            .build();
}

如果您未使用Spring Boot,则还需定义KafkaAdmin bean:

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    return new KafkaAdmin(configs);
}
如果您想编辑现有主题的配置,则需要使用AdminClient,以下是更改主题级别上的retention.ms的代码片段:
Map<String, Object> config = new HashMap<>();                
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
                         
AdminClient client = AdminClient.create(config);
                         
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic");
            
// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1680000");
Map<ConfigResource, Config> updateConfig = new HashMap<>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));

AlterConfigOp op = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>(1);
configs.put(resource, Arrays.asList(op));

AlterConfigsResult alterConfigsResult = client.incrementalAlterConfigs(configs);
        alterConfigsResult.all();

可以使用这个@PostConstruct方法自动设置配置,该方法接受NewTopic bean。


    @Autowired
    private Set<NewTopic> topics;

    @PostConstruct
    public void reconfigureTopics() throws ExecutionException, InterruptedException {

        try (final AdminClient adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers))) {
            adminClient.incrementalAlterConfigs(topics.stream()
                .filter(topic -> topic.configs() != null)
                .collect(Collectors.toMap(
                    topic -> new ConfigResource(ConfigResource.Type.TOPIC, topic.name()),
                    topic -> topic.configs().entrySet()
                        .stream()
                        .map(e -> new ConfigEntry(e.getKey(), e.getValue()))
                        .peek(ce -> log.debug("configuring {} {} = {}", topic.name(), ce.name(), ce.value()))
                        .map(ce -> new AlterConfigOp(ce, AlterConfigOp.OpType.SET))
                        .collect(Collectors.toList())
                )))
                .all()
                .get();
        }

    }

感谢提供示例代码。@ Sergi,如果我们尝试更改不存在主题的保留方式,alterConfigsResult.all() 会抛出任何异常吗?我们如何知道是否正在修改现有主题? - Rachan R K
1
更新现有主题的配置不会抛出任何异常。您可以使用describeConfigs方法获取现有主题的当前配置。 - Sergi Almar

1

0

要使用AdminClient编程方式创建一个具有指定保留时间的主题,请按照以下步骤进行:

NewTopic topic = new NewTopic(topicName, numPartitions, replicationFactor);
topic.configs(Map.of(TopicConfig.RETENTION_MS_CONFIG, retentionMs.toString()));
adminClient.createTopics(List.of(topic));

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接