编写 Kafka 消费者的 JUnit 测试

6

我有一个订阅主题的kafka消费者。实现运行良好。但是,当尝试为其实现单元测试时,由于它是通过Runnable接口实现的,存在问题。

实现

@Override
public void run() {
    kafkaConsumer.subscribe(kafkaTopics);

    while (true) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
        Map<String, InventoryStock> skuMap = new LinkedHashMap<>();

        try {
            // populating sku map with consumer record
            for (ConsumerRecord<String, String> record : records) {
                populateMap(skuMap, record.value());
            }

            if (MapUtils.isNotEmpty(skuMap)) {
                // writing sku inventory with populated sku map
                inventoryDao.updateInventoryTable(INVENTORY_JOB_ID, skuMap);
            }
        } catch (Exception e) {

        }
        kafkaConsumer.commitAsync();
    }
}

我尝试使用MockConsumer来实现测试。但是它需要在实现中分配给消费者。但是实现中的消费者不会暴露到外部。以下是我的尝试。

@Before
public void onBefore() {
    MockitoAnnotations.initMocks(this);

    Properties consumerProps = new Properties();
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    skuInventoryConsumer = new SkuInventoryConsumer(consumerProps);

    KafkaConsumer kafkaConsumerMock = mock(KafkaConsumer.class);

    Whitebox.setInternalState(skuInventoryConsumer, "LOGGER", LOGGER);
    Whitebox.setInternalState(skuInventoryConsumer, "kafkaConsumer", kafkaConsumerMock);

}

@Test
public void should_subscribe_on_topic() {
    consumer.assign(Arrays.asList(new TopicPartition("my_topic", 0)));

    HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
    beginningOffsets.put(new TopicPartition("my_topic", 0), 0L);
    consumer.updateBeginningOffsets(beginningOffsets);

    consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 0L, "mykey", "myvalue0"));
    consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 1L, "mykey", "myvalue1"));
    consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 2L, "mykey", "myvalue2"));
    consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 3L, "mykey", "myvalue3"));
    consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 4L, "mykey", "myvalue4"));
}

由于这是一个可运行的代码,并且消费者没有暴露,因此该测试未按预期工作。我该如何解决?


你想要测试什么?也许你可以添加倒计时门闩来获取回调,当消费者消费时(不是Mockito的方式,而是多线程的方式)。 - Ehud Lev
你找到如何测试消费者了吗? - Himesh gosvami
不行,找不到一种方法来测试实现了Runnable的消费者。但是找到了一些测试普通类中实现的普通消费者的方法。 - codebot
你能分享一个链接吗? - Fasco
我现在没有它。给我一些时间。我会用链接联系你。 - codebot
制片人在哪里? - undefined
1个回答

0
我建议使用Mockito,就像下面的样例一样。
Consumer<String, String> kafkaConsumerLocal = mock(Consumer.class);
KafkaConsumer kafkaConsumer = spy(new KafkaConsumer("topic-name"));

ReflectionTestUtils.setField(kafkaConsumer, "threadPoolCount", 1);
ReflectionTestUtils.setField(kafkaConsumer, "consumer", kafkaConsumerLocal);

doNothing().when(kafkaConsumer).runConsumer();
doNothing().when(kafkaConsumer).addShutDownHook();
doReturn(kafkaConsumerLocal).when(consumerInit).getKafkaConsumer();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接