Spring Kafka JsonDeserialization MessageConversionException 解析 JSON 失败,无法解析类名称,找不到该类。

14

我有两个服务需要通过 Kafka 进行通信。我们称第一个服务为WriteService,第二个服务为QueryService

WriteService一侧,我有以下生产者配置。

@Configuration
public class KafkaProducerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

我正在尝试发送一个 com.example.project.web.routes.dto.RouteDto 类的对象。

QueryService 端,消费者配置定义如下。

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.groupid}")
    private String serviceGroupId;

    @Value("${spring.kafka.consumer.trusted-packages}")
    private String trustedPackage;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                JsonDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, serviceGroupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPackage);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

}

监听器的定义如下。负载类具有完全限定名称-com.example.project.clientqueryview.module.routes.messaging.kafka.RouteDto

@KafkaListener(topics = "${spring.kafka.topics.routes}",
            containerFactory = "kafkaListenerContainerFactory")
    public void listenForRoute(ConsumerRecord<String, RouteDto> cr,
                               @Payload RouteDto payload) {
        logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
                typeIdHeader(cr.headers()), payload, cr.toString());
    }

    private static String typeIdHeader(Headers headers) {
        return StreamSupport.stream(headers.spliterator(), false)
                .filter(header -> header.key().equals("__TypeId__"))
                .findFirst().map(header -> new String(header.value())).orElse("N/A");
    }
当发送一条消息时,我会收到以下错误:

Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.project.web.routes.dto.RouteDto]; nested exception is java.lang.ClassNotFoundException: com.example.project.web.routes.dto.RouteDto

这个错误已经很明确了。但我不理解它为什么会默认表现出这种行为。我不希望在不同服务中有相同的包,这毫无意义。
我还没有找到一种方法来禁用这个设置,并使用一个标有@Payload注释的监听器提供的类。
如何解决这个问题,而不需要手动配置映射器?
2个回答

15

如果您正在使用 spring-kafka-2.2.x,您可以通过 JsonDeserializer 的重载构造函数来禁用默认标头。有关详细信息,请参见文档

从2.2版本开始,您可以通过使用一个布尔型的 useHeadersIfPresent 参数(默认值为 true)中的重载构造函数之一来明确配置反序列化器以使用提供的目标类型并忽略标头中的类型信息。下面的示例展示了如何实现:

DefaultKafkaConsumerFactory<String, Object> cf = new DefaultKafkaConsumerFactory<>(props,
    new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

如果使用版本较低,请使用MessageConverter(您可能会在spring-kafka-2.1.x及以上版本中看到此问题)。

Spring for Apache Kafka提供了一个MessageConverter抽象,其中包括MessagingMessageConverter实现以及其StringJsonMessageConverter和BytesJsonMessageConverter自定义。 您可以直接将MessageConverter注入到KafkaTemplate实例中,并使用AbstractKafkaListenerContainerFactory bean定义来设置@KafkaListener.containerFactory()属性。 下面的示例展示了如何这样做:

@Bean
 public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RouteDto> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
  }

  @KafkaListener(topics = "jsonData",
            containerFactory = "kafkaJsonListenerContainerFactory")
    public void jsonListener(RouteDto dto) {
     ...
   }

注意:只有在方法级别声明@KafkaListener注释时才能实现此类型推断。使用类级别的@KafkaListener,负载类型用于选择要调用哪个@KafkaHandler方法,因此在选择方法之前必须已经转换过。


谢谢您的回答,是否可以在不显式提供具体类的情况下进行配置?因此反序列化程序将从@Payload中推断出类型? - CROSP
请使用在@CROSP帖子中添加的MessageConverter - Ryuzaki L
谢谢您的详尽解释,我对这个配置感到有些困惑,即使在幕后使用了“反射”,仍需要额外的步骤。我使用了 Jackson 并没有遇到这样的问题。 - CROSP
哪个版本的spring-kafka?我不想降级,但目前我正在使用spring-kafka-2.0.5而没有任何问题。 - Ryuzaki L

3

我只需要更改

Json反序列化器,就解决了这个问题。

value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

toString反序列化器

value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

增加一个额外的豆子

@Bean
public StringJsonMessageConverter jsonConverter() {
    return new StringJsonMessageConverter();
}

在消费者端。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接