有没有一种方法可以获取Kafka主题中的最后一条消息?

5
我有一个具有多个分区的Kafka主题,我想知道在Java中是否有一种方法可以获取该主题的最新消息。 我不关心分区,只想获取最新的消息。
我尝试过使用@KafkaListener,但仅在更新主题时才会获取消息。如果在打开应用程序后没有发布任何内容,则不返回任何信息。
也许监听器根本不是解决问题的正确方法?
2个回答

6
这段代码对我有效,你可以试试。 注释中有解释。
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));

        consumer.poll(Duration.ofSeconds(10));

        consumer.assignment().forEach(System.out::println);

        AtomicLong maxTimestamp = new AtomicLong();
        AtomicReference<ConsumerRecord<String, String>> latestRecord = new AtomicReference<>();

        // get the last offsets for each partition
        consumer.endOffsets(consumer.assignment()).forEach((topicPartition, offset) -> {
            System.out.println("offset: "+offset);

            // seek to the last offset of each partition
            consumer.seek(topicPartition, (offset==0) ? offset:offset - 1);

            // poll to get the last record in each partition
            consumer.poll(Duration.ofSeconds(10)).forEach(record -> {

                // the latest record in the 'topic' is the one with the highest timestamp
                if (record.timestamp() > maxTimestamp.get()) {
                    maxTimestamp.set(record.timestamp());
                    latestRecord.set(record);
                }
            });
        });
        System.out.println(latestRecord.get());

似乎有一个问题...即使偏移量为0,您的代码仍然尝试在consumer.seek(topicPartition, offset - 1)中减去1。 - BanzaiTokyo
1
如果网络缓慢或Kafka繁忙会发生什么?那么poll(10秒)可能会返回零条记录或部分结果。 - Werner Daehn
1
@WernerDaehn 需要根据网络条件进行调整,可能可以使其可配置,这是一个示例片段。 - JavaTechnical
@JavaTechnical 是的,我明白了。但据我所知,目前没有好的解决方案。你必须等待很长时间,并希望它足够长。"长"的意思取决于使用情况。有一个 Jira 项目,我已经添加了一些评论。如果其他人也能发表评论,我会非常感激。https://issues.apache.org/jira/browse/KAFKA-10009 - Werner Daehn
consumer.poll() 不应该在 foreach 循环中。这样你会轮询与分区数量相同的次数。我们应该先为每个分区寻求到末尾-1,然后只轮询一次。 - undefined

1
您需要消费每个分区的最新消息,然后在客户端进行比较(使用消息中的时间戳,如果存在)。原因是Kafka不保证分区间的顺序。在一个分区内,您可以确定具有最大偏移量的消息是推送到该分区的最新消息。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接