如何检查Kafka消费者是否就绪

11

我将Kafka提交策略设置为“最新”,但是缺少前几条消息。如果在开始发送消息到输入主题之前等待20秒,一切都按预期进行。我不确定问题是否在于消费者需要较长时间来重新平衡分区。有没有办法在开始轮询之前知道消费者已经准备好了?

7个回答

3
  • 您可以使用consumer.assignment(),它将返回可用于该主题的所有分区集,并验证是否已分配所有可用分区。

  • 如果您正在使用spring-kafka项目,则可以包含spring-kafka-test依赖项,并使用以下方法等待主题分配,但您需要有容器。 ContainerTestUtils.waitForAssignment(Object container, int partitions);


2
我正在使用方法级别的@KafkaListener注解来定义消费者。在测试中如何获取容器?尝试自动装配MessageListenerContainer,但它会抱怨没有合格的bean。 - bfrguci

2
你可以这样做:

我有一个从kafka主题读取数据的测试。
因此,在多线程环境中不能使用KafkaConsumer,但是可以传递参数“AtomicReference assignment”,在消费者线程中更新它,并在另一个线程中读取它。

例如,项目中用于测试的工作代码片段:

    private void readAvro(String readFromKafka,
                      AtomicBoolean needStop,
                      List<Event> events,
                      String bootstrapServers,
                      int readTimeout) {
    // print the topic name
    AtomicReference<Set<TopicPartition>> assignment = new AtomicReference<>();
    new Thread(() -> readAvro(bootstrapServers, readFromKafka, needStop, events, readTimeout, assignment)).start();

    long startTime = System.currentTimeMillis();
    long maxWaitingTime = 30_000;
    for (long time = System.currentTimeMillis(); System.currentTimeMillis() - time < maxWaitingTime;) {
        Set<TopicPartition> assignments = Optional.ofNullable(assignment.get()).orElse(new HashSet<>());
        System.out.println("[!kafka-consumer!] Assignments [" + assignments.size() + "]: "
                + assignments.stream().map(v -> String.valueOf(v.partition())).collect(Collectors.joining(",")));
        if (assignments.size() > 0) {
            break;
        }
        try {
            Thread.sleep(1_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            needStop.set(true);
            break;
        }
    }
    System.out.println("Subscribed! Wait summary: " + (System.currentTimeMillis() - startTime));
}

private void readAvro(String bootstrapServers,
                      String readFromKafka,
                      AtomicBoolean needStop,
                      List<Event> events,
                      int readTimeout,
                      AtomicReference<Set<TopicPartition>> assignment) {

    KafkaConsumer<String, byte[]> consumer = (KafkaConsumer<String, byte[]>) queueKafkaConsumer(bootstrapServers, "latest");
    System.out.println("Subscribed to topic: " + readFromKafka);
    consumer.subscribe(Collections.singletonList(readFromKafka));

    long started = System.currentTimeMillis();
    while (!needStop.get()) {
        assignment.set(consumer.assignment());
        ConsumerRecords<String, byte[]> records = consumer.poll(1_000);
        events.addAll(CommonUtils4Tst.readEvents(records));

        if (readTimeout == -1) {
            if (events.size() > 0) {
                break;
            }
        } else if (System.currentTimeMillis() - started > readTimeout) {
            break;
        }
    }

    needStop.set(true);

    synchronized (MainTest.class) {
        MainTest.class.notifyAll();
    }
    consumer.close();
}

P.S.
needStop - 全局标志,以便在失败或成功的情况下停止所有运行线程
events - 要检查的对象列表
readTimeout - 我们将等待多长时间才能读取所有数据,如果readTimeout == -1,则在读取任何内容时停止


1
感谢Alexey(我也投了赞成票),我似乎已经解决了我的问题,基本上遵循了同样的想法。
只是想分享我的经验...在我们的情况下,我们使用Kafka在请求和响应方式中,有点像RPC。请求被发送到一个主题,然后在另一个主题上等待响应。遇到类似的问题,即漏掉第一个响应。
我尝试了... KafkaConsumer.assignment();重复执行(带有Thread.sleep(100);),但似乎没有帮助。添加KafkaConsumer.poll(50);似乎已经使消费者(组)处于就绪状态,并且也收到了第一个响应。测试了几次,现在一直工作正常。
顺便说一下,测试需要停止应用程序和删除Kafka主题,并为了保险起见,重新启动Kafka。
PS:只调用poll(50);而没有assignment();获取逻辑,如Alexey所提到的,可能不能保证消费者(组)已经准备好。

1
你可以修改一个AlwaysSeekToEndListener(仅监听新消息)以包含回调函数:
public class AlwaysSeekToEndListener<K, V> implements ConsumerRebalanceListener {
    private final Consumer<K, V> consumer;
    private Runnable callback;

    public AlwaysSeekToEndListener(Consumer<K, V> consumer) {
        this.consumer = consumer;
    }

    public AlwaysSeekToEndListener(Consumer<K, V> consumer, Runnable callback) {
        this.consumer = consumer;
        this.callback = callback;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        consumer.seekToEnd(partitions);
        if (callback != null) {
            callback.run();
        }
    }
}

通过latch回调函数订阅:

CountDownLatch initLatch = new CountDownLatch(1);

consumer.subscribe(singletonList(topic), new AlwaysSeekToEndListener<>(consumer, () -> initLatch.countDown()));

initLatch.await(); // blocks until consumer is ready and listening

然后继续启动您的生产者。

0

如果您的策略设置为latest - 如果没有先前提交的偏移量,则会生效 - 但是您没有先前提交的偏移量,那么您不必担心“丢失”的消息,因为您告诉Kafka不关心已经发送到您的消费者准备好的“先前”消息。

如果您关心“先前”的消息,则应将策略设置为earliest。

无论如何,无论策略如何,您看到的行为都是短暂的,即一旦在Kafka中保存了提交的偏移量,在每次重新启动时,消费者将从之前离开的地方继续进行。


我的需求是这样的,我需要发送记录并对其进行一些处理。读取的消息不应该被重新处理。而且我不需要通过将设置为最早来读取所有消息,因为在我的情况下这没有太多意义。 - Nagireddy Hanisha
很抱歉,您的期望与Kafka语义不匹配。 如果将auto.offset.reset设置为latest,则不应期望获取发送到主题的所有消息。消费者和生产者完全异步工作。 如果您想要获取已发送到主题的所有内容,请将重置设置为earliest。无论如何,一旦消费者组建立并提交了偏移量,重置策略将变得无关紧要。 - Edoardo Comar

0

在进行一些测试之前,我需要知道kafka消费者是否准备好了,所以我尝试使用consumer.assignment(),但它只返回分配的分区集合,但有一个问题,使用这个方法我无法看到分配给组的这些分区是否已经设置了偏移量,所以后来当我尝试使用消费者时,它没有正确地设置偏移量。

解决方案是使用committed(),这将为您提供给定分区的最后提交偏移量。

因此,您可以执行以下操作:consumer.committed(consumer.assignment())

如果还没有分配分区,它将返回:

{}

如果已经分配了分区,但还没有偏移量:
{name.of.topic-0=null, name.of.topic-1=null}

但如果存在分区和偏移量:

{name.of.topic-0=OffsetAndMetadata{offset=5197881, leaderEpoch=null, metadata=''}, name.of.topic-1=OffsetAndMetadata{offset=5198832, leaderEpoch=null, metadata=''}}

有了这些信息,您可以使用类似以下的内容:

consumer.committed(consumer.assignment()).isEmpty();
consumer.committed(consumer.assignment()).containsValue(null);

有了这些信息,您可以确信Kafka消费者已经准备就绪。


0

我在使用EmbeddedKafka进行测试时遇到了类似的问题。

免责声明。 我的方法可能看起来不像“kafka-way”,但它在尊重一些权衡的同时完成了工作。当然,它不应该在任何地方使用,只用于测试。

一般来说,测试包括以下步骤:

  1. 创建消费者
  2. 将一些消息发布到主题
  3. 期望只有特定的消息被消费

因此,我正在寻找具有auto.offset.reset=latest语义的保证,以便分配的主题已准备好进行轮询。最后,我决定使用特殊消息来标记消费者已准备就绪:

public class ConsumerHelper {
    
    public static KafkaConsumer<String, Object> buildConsumer(EmbeddedKafkaBroker broker, Set<String> topics) {
        var consumer = buildConsumer(broker);
        if (!CollectionUtils.isEmpty(topics)) {
            var producer = buildUtilProducer(...);
            var key = "util-message-key" + UUID.randomUUID(); //key must be unique for every method call
            topics.forEach(
                    topic -> producer.send(new ProducerRecord<>(topic, key, new Object()))
            );
            var uncheckedTopics = new HashSet<>(topics);
            consumer.subscribe(topics);
            do {
                consumer.poll(Duration.ofMillis()).forEach(record -> {
                    if (key.equals(record.getKey())) {
                        uncheckedTopics.remove(record.topic())
                    }
                });
                consumer.commitSync()
            } while (!uncheckedTopics.isEmpty() /* you may add some timeout check logic here if needed */)
        }
        return consumer;

    }


    /**
     * consumer builder method, e.g. with KafkaTestUtils
     *
     * @implSpec consumer group id must be unique, {@code auto.offset.reset} must be setted to {@code earliest}
     */
    private static KafkaConsumer<String, Object> buildConsumer(EmbeddedKafkaBroker broker) {
        var randomGroupId = "group-id-" + UUID.randomUUID(); //consumer group id must be unique
        var consumerProps = KafkaTestUtils.consumerProps(randomGroupId, "true", broker);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //this is important
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserilizer.class);
        //some extra consumer props if needed
        //...
        //
        return new KafkaConsumer<>(consumerProps);

    }

    /**
     * util producer builder method, e.g. with KafkaTestUtils
     */
    private static KafkaConsumer<String, Object> buildUtilProducer() {
        //...
    }

}

所有使用公共方法构建的“KafkaConsumer”都已准备好立即消费新消息。

显然的限制:测试不应同时运行。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接