使用Spring Kafka在一个消费者中消费多种消息类型

5

我有多个生产者可以向一个Kafka主题发送多种类型的事件。

我还有一个消费者,必须消耗所有类型的消息,并为每种类型的消息使用不同的逻辑。

    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(Message<EventOne> event) {
    logger.info("event={}", event);
}

但是在这种情况下,所有的消息都会传递到这个方法中,而不仅仅是EventOne。

如果我为每种类型的消息实现两种方法,那么所有的消息都只会传递到一个方法中。

如果我像这样实现监听器:

    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(Message<?> event) {
    logger.info("event={}", event);
}

然后我遇到了异常: org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-listener-1] ERROR org.springframework.kafka.listener.LoggingErrorHandler - Error while processing: ConsumerRecord java.lang.IllegalArgumentException: Unrecognized Type: [null]

请告诉我如何实现多类型消费者?

1个回答

8

我找到了解决方案,非常简单。 所以,这个问题并不清楚,但是我使用jsonMessageConverter 就像这样:

    @Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
}

默认情况下,Jackson库不会为JSON添加类信息,因此它无法猜测我想要反序列化的类型。解决方案是使用注释。

@JsonTypeInfo(use= JsonTypeInfo.Id.CLASS, include= JsonTypeInfo.As.PROPERTY, property="class")

这是一个将类信息添加到JSON字符串中的方法。在消费者端,我只需编写事件的基类。

    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(BasicEvent event) {

    if (event instanceof EventOne) {
        logger.info("type={EventOne}, event={}", event);
    } else {
        logger.info("type={EventTwo}, event={}", event);
    }
}

希望这些信息能对某些人有所帮助。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接