使用Spring嵌入式Kafka测试@KafkaListener

29

我正在尝试为我使用Spring Boot 2.x开发的Kafka监听器编写单元测试。由于这是一个单元测试,因此我不想启动完整的Kafka服务器或Zookeeper实例。因此,我决定使用Spring Embedded Kafka。

我的监听器定义非常基本。

@Component
public class Listener {
    private final CountDownLatch latch;

    @Autowired
    public Listener(CountDownLatch latch) {
        this.latch = latch;
    }

    @KafkaListener(topics = "sample-topic")
    public void listen(String message) {
        latch.countDown();
    }
}

此外,验证接收消息后的锁存器计数器是否为零的测试非常简单。

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {

    @Autowired
    private KafkaEmbedded embeddedKafka;

    @Autowired
    private CountDownLatch latch;

    private KafkaTemplate<Integer, String> producer;

    @Before
    public void setUp() {
        this.producer = buildKafkaTemplate();
        this.producer.setDefaultTopic("sample-topic");
    }

    private KafkaTemplate<Integer, String> buildKafkaTemplate() {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        return new KafkaTemplate<>(pf);
    }

    @Test
    public void listenerShouldConsumeMessages() throws InterruptedException {
        // Given
        producer.sendDefault(1, "Hello world");
        // Then
        assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
    }
}

不幸的是,测试失败了,我无法理解原因。是否可以使用KafkaEmbedded的一个实例来测试用注释@KafkaListener标记的方法?

所有代码都分享在我的 GitHub 仓库kafka-listener中。

感谢大家。


请查看我回答的编辑;我没有注意到您没有使用boot的配置属性来消费者。 - Gary Russell
2个回答

18

在消费者被分配主题/分区之前,您可能会发送消息。设置属性...

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

如果未指定版本,则默认为latest

这就像在控制台使用--from-beginning

编辑

哦,您没有使用boot的属性。

添加

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

编辑2

顺便说一下,您可能还应该对template.send()的结果(一个Future<>)执行get(10L, TimeUnit.SECONDS)以确保发送成功。

编辑3

为了在测试中覆盖偏移重置,您可以像对代理地址所做的那样进行操作:

@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;

...

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);

@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.auto-offset-reset=earliest"})

然而,请记住,这个属性只适用于组第一次消费的情况。为了每次应用程序启动时都从结尾开始,您必须在启动期间寻找结尾。

此外,我建议将enable.auto.commit设置为false,以便容器负责提交偏移量,而不仅仅依赖于消费者客户端在时间表上执行此操作。


1
谢谢。将auto.offset.reset属性设置为earliest就可以了 :) - riccardo.cardin
然而,如果我需要将latest作为auto.offset.reset的值,该如何使测试工作?非常感谢。 - riccardo.cardin
这怎么测试包含@KafkaListener方法的“Listener”类呢?我看到了CountDownLatch断言,但从未看到过任何方法被调用的断言... - Rob
@KafkaListener 方法中的代码会倒计时,因此它被调用。然而,通常情况下不会这样做。最有可能的是监听器会调用一个服务,您应该注入一个模拟或存根服务。 - Gary Russell

1
也许有人会发现这很有用。我遇到了类似的问题。本地测试运行正常(一些检查是在Awaitility.waitAtMost中执行的),但在Jenkins管道中,测试失败了。
解决方案就像最受欢迎的答案中所述,设置auto-offset-reset=earliest。当测试运行时,您可以通过查看测试日志来检查是否正确设置了配置。Spring会输出生产者和消费者的配置。

非常有用,谢谢!当我使用Intellij时,我也遇到了收集覆盖率的同样问题。 - rios0rios0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接