如何为@KafkaListener编写单元测试?

21

尝试使用spring-kafka和spring-kafka-test编写@KafkaListener的单元测试。

我的监听器类。

    public class MyKafkaListener {
    @Autowired
    private MyMessageProcessor myMessageProcessor;

    @KafkaListener(topics = "${kafka.topic.01}", groupId = "SF.CLIENT", clientIdPrefix = "SF.01", containerFactory = "myMessageListenerContainerFactory")
    public void myMessageListener(MyMessage message) {
        myMessageProcessor.process(message);
        log.info("MyMessage processed");
    }}

我的测试类:

    @RunWith(SpringRunner.class)
    @DirtiesContext
    @EmbeddedKafka(partitions = 1, topics = {"I1.Topic.json.001"})
    @ContextConfiguration(classes = {TestKafkaConfig.class})
    public class MyMessageConsumersTest {

    @Autowired
    private MyMessageProcessor myMessageProcessor;

    @Value("${kafka.topic.01}")
    private String TOPIC_01;

    @Autowired
    private KafkaTemplate<String, MyMessage> messageProducer;

    @Test
    public void testSalesforceMessageListner() {
        MyMessageConsumers myMessageConsumers = new MyMessageConsumers(mockService);
        messageProducer.send(TOPIC_01, "MessageID", new MyMessage());
        verify(myMessageProcessor, times(1)).process(any(MyMessage.class));
    }}

我的测试配置类:

    @Configuration
    @EnableKafka
    public class TestKafkaConfig {
    @Bean
    public MyMessageProcessor myMessageProcessor() {
        return mock(MyMessageProcessor.class);
    }
    @Bean
    public KafkaEmbedded kafkaEmbedded() {
        return new KafkaEmbedded(1, true, 1, "I1.Topic.json.001");
    }

    //Consumer
    @Bean
    public ConsumerFactory<String, MyMessage> myMessageConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyMessage.class));
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MyMessage> myMessageListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MyMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(myMessageConsumerFactory());
        return factory;
    }

    //Producer
    @Bean
    public ProducerFactory<String, MyMessage> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMessageSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
    @Bean
    public KafkaTemplate<String, MyMessage> messageProducer() {
        return new KafkaTemplate<>(producerFactory());
    }
    }

有没有简单的方法使这个工作?

还是说我应该用其他方式来测试@KafkaListener?在单元测试中,当有新消息到达Kafka时,我该如何确保@KafkaListener被调用。


这是一篇非常有用的文章:https://www.geekyhacker.com/2020/10/03/test-spring-kafka-consumer-and-producer-with-embeddedkafka/ - Smaillns
5个回答

10
如何确保在Kafka中有新消息时调用@KafkaListener? 这基本上是框架的责任来测试这样的功能。在您的情况下,您只需要集中精力于业务逻辑并且单元测试确切地测试您的自定义代码,而不是编译在框架中的代码。此外,测试仅记录传入消息的@KafkaListener方法没有什么好处,肯定很难找到测试用例验证的“钩子”。 另一方面,我真的相信您@KafkaListener方法中的业务逻辑比您展示的复杂得多。因此,与其尝试准确找到myMessageListener()的钩子,不如验证从该方法调用的自定义代码(例如DB插入、其他服务调用等)更好。你对 mock(MyMessageProcessor.class) 的处理是业务逻辑验证的好方法。你代码中唯一有问题的就是关于 EmbeddedKafka 的重复:你在配置中使用了注解并声明了一个 @Bean。你应该考虑删除其中一个。虽然不清楚你的生产代码在哪里,这些代码真正不依赖于内嵌的 Kafka。否则,如果所有内容都在测试范围内,我不认为有任何消费者和生产者工厂配置方面的问题。你绝对拥有最简单的 @KafkaListenerKafkaTemplate 配置。唯一需要做的就是删除 @EmbeddedKafka 不要启动两次代理。

9
你可以将监听器包装在测试用例中。
鉴于。
@SpringBootApplication
public class So52783066Application {

    public static void main(String[] args) {
        SpringApplication.run(So52783066Application.class, args);
    }

    @KafkaListener(id = "so52783066", topics = "so52783066")
    public void listen(String in) {
        System.out.println(in);
    }

}

那么

@RunWith(SpringRunner.class)
@SpringBootTest
public class So52783066ApplicationTests {

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "so52783066");

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private KafkaTemplate<String, String> template;

    @Before
    public void setup() {
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    }

    @Test
    public void test() throws Exception {
        ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry
                .getListenerContainer("so52783066");
        container.stop();
        @SuppressWarnings("unchecked")
        AcknowledgingConsumerAwareMessageListener<String, String> messageListener = (AcknowledgingConsumerAwareMessageListener<String, String>) container
                .getContainerProperties().getMessageListener();
        CountDownLatch latch = new CountDownLatch(1);
        container.getContainerProperties()
                .setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, String>() {

                    @Override
                    public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment,
                            Consumer<?, ?> consumer) {
                        messageListener.onMessage(data, acknowledgment, consumer);
                        latch.countDown();
                    }

                });
        container.start();
        template.send("so52783066", "foo");
        assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
    }

}

什么是KafkaEmbedded? - Felipe Jorge
不要在旧答案上提问,而是提一个新问题。这是“KafkaEmbeddedBroker”的旧名称 - 请参阅文档 - Gary Russell
嗨,Gary,这里的消费者将如何获取嵌入式Kafka代理ID?因为消费者配置将在执行测试用例之前创建。 - Akhil
不要在旧答案上提出新问题,这样做将无助于其他人找到答案。请参见文档@EmbeddedKafka(topics =“someTopic”,bootstrapServersProperty =“spring.kafka.bootstrap-servers”)将覆盖应用程序属性。如果这不是你的问题,请提出一个新问题。 - Gary Russell

5

以下是我基于您的代码为消费者提供的可行解决方案。谢谢 :-)

配置如下:

@TestConfiguration
@EnableKafka
@Profile("kafka_test")
public class KafkaTestConfig {

    private static Logger log = LoggerFactory.getLogger(KafkaTestConfig.class);

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    @Primary
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);

        log.info("Consumer TEST config = {}", props);
        return props;
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        log.info("Producer TEST config = {}", props);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new JsonDeserializer<String>());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(producerConfigs());
        return pf;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        factory.setConcurrency(2);
        return factory;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;
    }

    @Bean
    public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {
        KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry = new KafkaListenerEndpointRegistry();
        return kafkaListenerEndpointRegistry;
    }
}

将所有需要包含在测试中的bean放置在不同的类中:
@TestConfiguration
@Profile("kafka_test")
@EnableKafka
public class KafkaBeansConfig {

    @Bean
    public MyProducer myProducer() {
        return new MyProducer();
    }

    // more beans
}

我创建了一个BaseKafkaConsumerTest类以便重复使用它:
@ExtendWith(SpringExtension.class)
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
@TestInstance(Lifecycle.PER_CLASS)
@DirtiesContext
@ContextConfiguration(classes = KafkaTestConfig.class)
@ActiveProfiles("kafka_test")
public class BaseKafkaConsumerTest {

    @Autowired
    protected EmbeddedKafkaBroker embeddedKafka;

    @Value("${spring.embedded.kafka.brokers}")
    private String brokerAddresses;

    @Autowired
    protected KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Autowired
    protected KafkaTemplate<String, String> senderTemplate;

    public void setUp() {
        embeddedKafka.brokerProperty("controlled.shutdown.enable", true);

        for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
                .getListenerContainers()) {
            System.err.println(messageListenerContainer.getContainerProperties().toString());
            ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafka.getPartitionsPerTopic());
        }
    }

    @AfterAll
    public void tearDown() {
        for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
                .getListenerContainers()) {
            messageListenerContainer.stop();
        }

        embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
        embeddedKafka.getKafkaServers().forEach(b -> b.awaitShutdown());
    }

}

扩展基类以测试您的消费者:

@EmbeddedKafka(topics = MyConsumer.TOPIC_NAME)
@Import(KafkaBeansConfig.class)
public class MYKafkaConsumerTest extends BaseKafkaConsumerTest {

    private static Logger log = LoggerFactory.getLogger(PaymentMethodsKafkaConsumerTest.class);

    @Autowired
    private MyConsumer myConsumer;

    // mocks with @MockBean

    @Configuration
    @ComponentScan({ "com.myfirm.kafka" })
    static class KafkaLocalTestConfig {
    }

    @BeforeAll
    public void setUp() {
        super.setUp();
    }

    @Test
    public void testMessageIsReceived() throws Exception {

    //mocks

    String jsonPayload = "{\"id\":\"12345\","cookieDomain\":"helloworld"}";
    ListenableFuture<SendResult<String, String>> future =
        senderTemplate.send(MyConsumer.TOPIC_NAME, jsonPayload);

    Thread.sleep(10000);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override
        public void onSuccess(SendResult<String, String> result) {
            log.info("successfully sent message='{}' with offset={}", jsonPayload,
                result.getRecordMetadata().offset());
        }

        @Override
        public void onFailure(Throwable ex) {
            log.error("unable to send message='{}'", jsonPayload, ex);
        }
    });

    Mockito.verify(myService, Mockito.times(1))
    .update(Mockito.any(MyDetails.class));
}

根据我在其他帖子中的阅读,不要用这种方式测试业务逻辑。只需检查是否进行了调用。


你的例子中,myService是什么? - Amit Rai
1
也许是我的消费者。我记不清了。我替换了公司变量。 - Laura Liparulo
好的,谢谢。我正在为Kafka监听器编写测试而感到困难。 - Amit Rai

1
如果你想使用EmbeddedKafka编写集成测试,你可以像这样做。假设我们有一些KafkaListener,它接受一个RequestDto作为Payload
在你的测试类中,你应该创建一个TestConfiguration来创建生产者bean,并将KafkaTemplate自动装配到你的测试中。还要注意,我们注入了一个消费者SpyBean,而不是自动装配消费者。
someTest方法中,我们创建了一个门闩,并设置了消费者监听方法,这样当它被调用时,门闩将打开,并且只有在监听器接收到负载后才会进行断言。
还要注意any() ?: RequestDto()这一行。你应该只在非空Kotlin方法参数上使用Mockito的any()时,才能使用Elvis操作符与any(),因为any()首先返回null。
@EnableKafka
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@EmbeddedKafka(partitions = 10, brokerProperties = ["listeners=PLAINTEXT://localhost:9092", "port=9092"])
class KafkaIgniteApplicationTests {

    @SpyBean
    private lateinit var consumer: Consumer

    @TestConfiguration
    class Config {

        @Value("\${spring.kafka.consumer.bootstrap-servers}")
        private lateinit var servers: String

        fun producerConfig(): Map<String, Any> {
            val props = mutableMapOf<String, Any>()
            props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers
            props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
            props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
            return props
        }

        @Bean
        fun producerFactory(): ProducerFactory<String, String> {
            return DefaultKafkaProducerFactory(producerConfig())
        }

        @Bean
        fun kafkaTemplate(producerFactory: ProducerFactory<String, String>): KafkaTemplate<String, String> {
            return KafkaTemplate(producerFactory)
        }
    }

    @Autowired
    private lateinit var kafkaTemplate: KafkaTemplate<String, String>

    @Test
    fun someTest() {
        val lock = CountDownLatch(1)
        `when`(consumer.receive(any() ?: RequestDto())).thenAnswer {
            it.callRealMethod()
            lock.countDown()
        }
        val request = "{\"value\":\"1\"}"
        kafkaTemplate.send(TOPIC, request)
        lock.await(1000, TimeUnit.MILLISECONDS)
        verify(consumer).receive(RequestDto().apply { value = BigDecimal.ONE })
    }
}

这是“the”惯用解决方案。 - Mr. Gung

1
在单元测试中,如何确保当Kafka中有新消息到达时会调用@KafkaListener。
与使用Awaitility或CountDownLatch方法不同,更简单的方法是使用@SpyBean将实际的@KafkaListener bean作为mockito spy。Spy基本上允许您记录对实际bean实例进行的所有交互,以便稍后验证其交互。结合mockito的超时验证功能,您可以确保在生产者发送消息后,在一定的超时时间内进行验证。
类似于:
@SpringBootTest(properties = {"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"})
@EmbeddedKafka(topics = {"fooTopic"})
public class MyMessageConsumersTest {

   @SpyBean
   private MyKafkaListener myKafkaListener;

   @Captor
   private ArgumentCaptor<MyMessage> myMessageCaptor;

   @Test
   public void test(){

      //create KafkaTemplate to send some message to the topic...

       verify(myKafkaListener, timeout(5000)). myMessageListener(myMessageCaptor.capture());
       
      //assert the KafkaListener is configured correctly such that it is invoked with the expected parameter
       assertThat(myMessageCaptor.getValue()).isEqualTo(xxxxx);
      
   }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接