使用Spring Boot的简单嵌入式Kafka测试示例

53

编辑说明:GitHub工作示例


我在搜索互联网时找不到一个可行且简单的嵌入式Kafka测试示例。

我的设置是:

  • Spring Boot
  • 同一个类中有多个@KafkaListener监听不同主题的方法
  • 启动良好的嵌入式Kafka测试
  • Kafkatemplate测试已发送到主题,但即使经过长时间的睡眠,@KafkaListener方法仍未收到任何信息
  • 日志中只显示来自Kafka的信息,没有警告或错误。

请帮忙。大多数示例都是过度配置或过度设计。我相信它可以简单地实现。谢谢各位!

@Controller
public class KafkaController {

    private static final Logger LOG = getLogger(KafkaController.class);

    @KafkaListener(topics = "test.kafka.topic")
    public void receiveDunningHead(final String payload) {
        LOG.debug("Receiving event with payload [{}]", payload);
        //I will do database stuff here which i could check in db for testing
    }
}
私有静态字符串SENDER_TOPIC =“test.kafka.topic”;
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);

@Test
    public void testSend() throws InterruptedException, ExecutionException {

        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
        Thread.sleep(10000);
    }

@pvpkiran,这仍然无法正常工作。测试只测试自身,但当我将发送部分带到我的主题时,它从未到达我的KafkaListener。 - Yuna Braska
@ArtemBilan 因为方法上有 [@KafkaListener] 注解,所以我需要做些其他的事情吗? - Yuna Braska
没错,这个测试需要使用该组件引导应用程序上下文。 - Artem Bilan
没错,但是 @SpringBootTest 怎么知道你的 KafkaController 组件呢?它是如何被扫描或配置的? - Artem Bilan
请注意样例中@SpringBootApplication类与@SpringBootTest在同一个包中。而ReceiverSender组件则在嵌套的包中。这样,它们都能被清晰地扫描和配置。这就是它的工作原理。如果你的@SpringBootTest在不同的包中,那么你的组件将不可见,你需要提供一些@Configuration类。 - Artem Bilan
显示剩余3条评论
4个回答

42

以下配置使嵌入式Kafka测试正常运行:

在测试类上使用注解。

@EnableKafka
@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config
@EmbeddedKafka(
    partitions = 1, 
    controlledShutdown = false,
    brokerProperties = {
        "listeners=PLAINTEXT://localhost:3333", 
        "port=3333"
})
public class KafkaConsumerTest {
    @Autowired
    KafkaEmbedded kafkaEmbeded;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

设置方法的注释之前

@Before
public void setUp() throws Exception {
  for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(messageListenerContainer, 
    kafkaEmbeded.getPartitionsPerTopic());
  }
}
注意:我没有使用@ClassRule来创建嵌入式Kafka,而是使用自动装配的方式:@Autowired embeddedKafka
@Test
public void testReceive() throws Exception {
     kafkaTemplate.send(topic, data);
}

希望这能帮到您!

编辑:测试配置类标记为@TestConfiguration

@TestConfiguration
public class TestConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic(topic);
    return kafkaTemplate;
}

现在@Test方法将自动装配KafkaTemplate并使用它发送消息。

kafkaTemplate.send(topic, data);

已经更新了答案代码块,并添加了上述行。


1
谢谢!这听起来很不错,但是[@EmbeddedKafka]和[kafkaListenerEndpointRegistry]从哪里来?你能发一个带有导入的完整示例吗? - Yuna Braska
由于我们已经使用@EnableKafka@EmbeddedKafka注释来注释我们的类,因此您可以在测试类中自动装配两者。在第一个代码块中,@Autowired KafkaEmbedded kafkaEmbedded已经存在,就像这样,您也可以为kafkaListenerEndpointRegistry进行自动装配。 - donm
1
我仍然不知道"@EmbeddedKafka"是从哪里来的。需要哪个依赖项?我目前正在使用"spring-kafka-test"。 - Yuna Braska
2
TestConfig 最好在 KafkaConsumerTest 内部类中声明。在这种情况下:a) 它必须是 static b) 必须将 KafkaEmbedded 作为 producerFactory 方法的参数注入 c) 将 ProducerFactory 作为 kafkaTemplate 方法的参数注入,然后使用它来代替调用 producerFactory() - dav.garcia
2
setup() 方法中使用 ContainerTestUtils.waitForAssignment(..) 对我们非常有帮助。我们遇到了消费者在另一个测试类之后挂起的情况,导致下一个测试的消费者没有接收到任何消息。我们还使用了 @DirtiesContext(AFTER_CLASS) - Tom AsIdea
显示剩余6条评论

26

由于被接受的答案对我来说无法编译或运行。我发现了另一种基于https://blog.mimacom.com/testing-apache-kafka-with-spring-boot/的解决方案,我想与您分享。

依赖项为 'spring-kafka-test' 版本:'2.2.7.RELEASE'

@RunWith(SpringRunner.class)
@EmbeddedKafka(partitions = 1, topics = { "testTopic" })
@SpringBootTest
public class SimpleKafkaTest {

    private static final String TEST_TOPIC = "testTopic";

    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;

    @Test
    public void testReceivingKafkaEvents() {
        Consumer<Integer, String> consumer = configureConsumer();
        Producer<Integer, String> producer = configureProducer();

        producer.send(new ProducerRecord<>(TEST_TOPIC, 123, "my-test-value"));

        ConsumerRecord<Integer, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TEST_TOPIC);
        assertThat(singleRecord).isNotNull();
        assertThat(singleRecord.key()).isEqualTo(123);
        assertThat(singleRecord.value()).isEqualTo("my-test-value");

        consumer.close();
        producer.close();
    }

    private Consumer<Integer, String> configureConsumer() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        Consumer<Integer, String> consumer = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps)
                .createConsumer();
        consumer.subscribe(Collections.singleton(TEST_TOPIC));
        return consumer;
    }

    private Producer<Integer, String> configureProducer() {
        Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
    }
}

5
你在测试嵌入式Kafka本身吗? - Manish Bansal
java.lang.NoClassDefFoundError: kafka/common/KafkaExceptionJava.lang.NoClassDefFoundError:kafka/common/KafkaException - Debadatta
在寻找嵌入式Kafka配置的最简单选项时,我发现了这个。感谢您发布它。 - Naveen Kulkarni

8

我现在解决了这个问题。

@BeforeClass
public static void setUpBeforeClass() {
    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}

在我调试的时候,我发现嵌入式kaka服务器正在使用一个随机端口。

我找不到它的配置,所以我将kafka配置设置为与服务器相同。对我来说看起来仍然有点丑陋。

我希望只有 @Mayur 提到的那一行。

@EmbeddedKafka(partitions = 1, controlledShutdown = false, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})

但是在互联网上找不到正确的依赖项。


10
您可以在应用程序属性文件中设置spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers},以进行测试,这样应该可以正常工作。此值从EmbeddedKafka启动时分配的随机端口填充。 - Flashpix
1
这个注解@EmbeddedKafka在我的情况下来自spring-kafka-test-2.6.5。我在pom中有一个对spring-kafka-test的依赖,并且我正在使用spring-boot 2.4.2版本。@Sylhare - Aldo Inácio da Silva

2

在集成测试中,不建议使用固定端口(例如9092),因为多个测试应该具有灵活性以从嵌入实例中打开自己的端口。因此,以下实现类似于:

NB:此实现基于junit5(Jupiter:5.7.0)和spring-boot 2.3.4.RELEASE

测试类:

@EnableKafka
@SpringBootTest(classes = {ConsumerTest.Config.class, Consumer.class})
@EmbeddedKafka(
        partitions = 1,
        controlledShutdown = false)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ConsumerTest {

    @Autowired
    private EmbeddedKafkaBroker kafkaEmbedded;

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @BeforeAll
    public void setUp() throws Exception {
        for (final MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
            ContainerTestUtils.waitForAssignment(messageListenerContainer,
                    kafkaEmbedded.getPartitionsPerTopic());
        }
    }

    @Value("${topic.name}")
    private String topicName;

    @Autowired
    private KafkaTemplate<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestKafkaTemplate;

    @Test
    public void consume_success() {
        requestKafkaTemplate.send(topicName, load);
    }


    @Configuration
    @Import({
            KafkaListenerConfig.class,
            TopicConfig.class
    })
    public static class Config {

        @Value(value = "${spring.kafka.bootstrap-servers}")
        private String bootstrapAddress;

        @Bean
        public ProducerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestProducerFactory() {
            final Map<String, Object> configProps = new HashMap<>();
            configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
            configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }

        @Bean
        public KafkaTemplate<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestKafkaTemplate() {
            return new KafkaTemplate<>(requestProducerFactory());
        }
    }
}

监听器类:

@Component
public class Consumer {
    @KafkaListener(
            topics = "${topic.name}",
            containerFactory = "listenerContainerFactory"
    )
    @Override
    public void listener(
            final ConsumerRecord<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> consumerRecord,
            final @Payload Optional<Map<String, List<ImmutablePair<String, String>>>> payload
    ) {
        
    }
}

监听器配置:

@Configuration
public class KafkaListenerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${topic.name}")
    private String resolvedTreeQueueName;

    @Bean
    public ConsumerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> resolvedTreeConsumerFactory() {
        final Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, resolvedTreeQueueName);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new CustomDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> resolvedTreeListenerContainerFactory() {
        final ConcurrentKafkaListenerContainerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(resolvedTreeConsumerFactory());
        return factory;
    }

}

主题配置:

@Configuration
public class TopicConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${topic.name}")
    private String requestQueue;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic requestTopic() {
        return new NewTopic(requestQueue, 1, (short) 1);
    }
}

application.properties:

spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}

这项任务是将嵌入式实例端口与KafkaTemplate和KafkaListners绑定的最重要任务。

按照上述实现,您可以为每个测试类打开动态端口,这样会更加方便。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接