Kafka消费者动态检测添加的主题

8
我正在使用KafkaConsumer来消费Kafka服务器(主题)中的消息...
  • 对于在启动消费者代码之前创建的主题,它可以正常工作...
但问题是,如果动态创建主题(也就是说,在消费者代码启动后),它将无法工作,但API表示它支持动态主题创建...这是您参考的链接。
使用的Kafka版本:0.9.0.1

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

这里是JAVA代码...

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Pattern r = Pattern.compile("siddu(\\d)*");

    consumer.subscribe(r, new HandleRebalance());
    try {
         while(true) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(partition.partition()  + ": "  +record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

注意:我的主题名称与正则表达式匹配。如果我重新启动消费者,则它将开始读取推送到主题的消息... 非常感谢任何帮助。
4个回答

18

在apache kafka邮件档案中已经回答了这个问题,我在下面复制了答案:

消费者支持一个配置选项"metadata.max.age.ms",它基本上控制主题元数据获取的频率。默认情况下,这个值设置得相当高(5分钟),这意味着需要最多5分钟才能发现与您的正则表达式匹配的新主题。您可以将其设置为更低以更快地发现主题。

因此,在你的props中你可以:

props.put("metadata.max.age.ms", 5000);

这将导致您的用户每5秒钟发现新话题。


1
这也取决于您如何设置“auto.offset.reset”消费者属性。如果它是“latest”,则会从已知主题中选择最新/ [未被消费的] 消息(在消费者启动后),但不包括动态主题。如果您将其设置为“earliest”,并且在轮询之前放置了consumer.seekToBeginning(consumer.assignment())-仅执行一次,则它将识别动态/新主题,但每次都会从开头获取所有记录。 - Sasha Bond
我们能以某种方式强制获取元数据请求吗?例如,consumer.fetchMeta()或其他什么方法吗? - andrii

4
你可以通过连接到Zookeeper来实现。请查看示例代码。基本上,你将在Zookeeper节点/brokers/topics上创建一个监视器。当有新的子节点被添加时,表示有新的主题被添加,你的监视器将被触发。
请注意,与其他答案的区别在于,这个答案是一个触发器,而另一个是轮询 -- 这个答案尽可能接近实时,而另一个则在最好情况下会在轮询间隔内。

感谢您的回复和帮助...基本上我想使用KafkaConsumer API来实现这个,我已经自己解决了。 - siddu
@madlad 请看下面我的回答。 - bhspencer
“样例代码”链接无效,而且问题是关于消费消息,而不仅仅是了解新主题...新主题将在consumer.listTopics().keySet()中可用。 - Sasha Bond
链接已修复 -- 同时添加了一行关于这两种方法的区别。 - David Griffin

2

以下是使用KafkaConsumer API解决问题的方法。这是Java代码:

这段代码对我有用。

private static Consumer<Long, String> createConsumer(String topic) {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG,
            "KafkaExampleConsumer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    // Create the consumer using props.
    final Consumer<Long, String> consumer =
            new KafkaConsumer<>(props);
    // Subscribe to the topic.
    consumer.subscribe(Collections.singletonList(topic));
    return consumer;
}

public static void runConsumer(String topic) throws InterruptedException {
    final Consumer<Long, String> consumer = createConsumer(topic);

    ConsumerRecords<Long, String> records = consumer.poll(100);
    for (ConsumerRecord<Long, String> record : records)
        System.out.printf("hiiiii offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    consumer.commitAsync();
    consumer.close();
    //System.out.println("DONE");
}

使用此方法,我们可以从动态创建的主题中消费消息。最初的回答。

0
在KafkaConsumer类中使用订阅方法subscribe,该方法需要以模式为参数的主题列表来获取数据。
/** * 订阅所有与指定模式匹配的主题,以获取动态分配的分区。匹配模式将定期针对所有现有主题进行。这可以通过{@code metadata.max.age.ms}配置来控制:通过降低最大元数据年龄,消费者将更频繁地刷新元数据并检查匹配的主题。 * 有关使用{@link ConsumerRebalanceListener}的详细信息,请参见{@link #subscribe(Collection, ConsumerRebalanceListener)}。通常,在提供的模式匹配的主题发生更改和消费者组成员身份更改时触发重新平衡。组重新平衡仅在调用{@link #poll(Duration)}时才会发生。 * @param pattern 要订阅的模式 * @param listener 非空监听器实例,用于获取已订阅主题的分区分配/撤销通知 * @throws IllegalArgumentException 如果模式或侦听器为空 * @throws IllegalStateException 如果先前使用主题调用了{@code subscribe()},或者之前调用了assign(而没有随后调用{@link #unsubscribe()}),或者未配置至少一个分区分配策略 */ @Override public void
subscribe(Pattern pattern, ConsumerRebalanceListener listener) {

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接