在集成测试中,不建议使用固定端口(例如9092),因为多个测试应该具有灵活性以从嵌入实例中打开自己的端口。因此,以下实现类似于:
NB:此实现基于junit5(Jupiter:5.7.0)和spring-boot 2.3.4.RELEASE
测试类:
@EnableKafka
@SpringBootTest(classes = {ConsumerTest.Config.class, Consumer.class})
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ConsumerTest {
@Autowired
private EmbeddedKafkaBroker kafkaEmbedded;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@BeforeAll
public void setUp() throws Exception {
for (final MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbedded.getPartitionsPerTopic());
}
}
@Value("${topic.name}")
private String topicName;
@Autowired
private KafkaTemplate<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestKafkaTemplate;
@Test
public void consume_success() {
requestKafkaTemplate.send(topicName, load);
}
@Configuration
@Import({
KafkaListenerConfig.class,
TopicConfig.class
})
public static class Config {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestProducerFactory() {
final Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestKafkaTemplate() {
return new KafkaTemplate<>(requestProducerFactory());
}
}
}
监听器类:
@Component
public class Consumer {
@KafkaListener(
topics = "${topic.name}",
containerFactory = "listenerContainerFactory"
)
@Override
public void listener(
final ConsumerRecord<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> consumerRecord,
final @Payload Optional<Map<String, List<ImmutablePair<String, String>>>> payload
) {
}
}
监听器配置:
@Configuration
public class KafkaListenerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${topic.name}")
private String resolvedTreeQueueName;
@Bean
public ConsumerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> resolvedTreeConsumerFactory() {
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, resolvedTreeQueueName);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new CustomDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> resolvedTreeListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(resolvedTreeConsumerFactory());
return factory;
}
}
主题配置:
@Configuration
public class TopicConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${topic.name}")
private String requestQueue;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic requestTopic() {
return new NewTopic(requestQueue, 1, (short) 1);
}
}
application.properties:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
这项任务是将嵌入式实例端口与KafkaTemplate和KafkaListners绑定的最重要任务。
按照上述实现,您可以为每个测试类打开动态端口,这样会更加方便。
@SpringBootTest
怎么知道你的KafkaController
组件呢?它是如何被扫描或配置的? - Artem Bilan@SpringBootApplication
类与@SpringBootTest
在同一个包中。而Receiver
和Sender
组件则在嵌套的包中。这样,它们都能被清晰地扫描和配置。这就是它的工作原理。如果你的@SpringBootTest
在不同的包中,那么你的组件将不可见,你需要提供一些@Configuration
类。 - Artem Bilan