Spring Boot Kafka类反序列化 - 不在受信任的包中

8

我知道这个问题非常普遍,但是在尝试不同的解决方案后,我没有找到任何一个有效的解决方法。 我想在收到 Kafka 消息时反序列化字符串以及我的自定义类对象。 与字符串一起使用很好,但是与我的类不行。 我已在消费者配置中添加了可信包(其中com.springmiddleware.entities是我的类所在的包):

@Bean
    public Map<String, Object> consumerConfigs() {


        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");

        return props;
    }

我在我的application.yml文件中有这个内容:

spring:
kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: foo
      auto-offset-reset: earliest
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: 'com.springmiddleware.entities'

并将这些行添加到application.properties

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.springmiddleware.entities
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer 
spring.kafka.producer.properties.spring.json.add.type.headers=false

但是一直显示以下错误:
org.apache.kafka.common.errors.SerializationException:在偏移1的topic2-0分区上反序列化键/值时出错。如果需要,请寻找过去的记录以继续消耗。原因是:java.lang.IllegalArgumentException:类'com.springmiddleware.entities.Crime'不在可信包中:[java.util,java.lang]。如果您认为此类是安全的进行反序列化,请提供其名称。如果仅由受信任的源进行序列化,则还可以启用全部信任(*)。
更新
ReceiverConfig:
@EnableKafka
@Configuration
public class ReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {


        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");
        props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");
        return props;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),  new StringDeserializer(),
                new JsonDeserializer<>());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

更新2

Listener Class (Receiver): 

    @KafkaListener(topics = "${app.topic.foo}")
@Service
public class Receiver {


     private CountDownLatch latch = new CountDownLatch(1);

        public CountDownLatch getLatch() {
            return latch;
        }

   @KafkaHandler
    public void listen(@Payload Crime message) {

            System.out.println("Received " + message);
    }

   @KafkaHandler
    public void listen(@Payload String message) {

        System.out.println("Received " +  message);
}

你正在使用哪个版本的Spring Kafka? - Not a JD
@NotaJD 你好,我正在使用2.2.4版本。 - Usr
你能展示完整的Consumerconfig文件吗,@TodorokiM。 - Ryuzaki L
@Deadpool 使用该类进行编辑。 - Usr
1个回答

11

只需使用重载的JsonDeserializer构造函数。

从2.2版本开始,您可以通过使用带有布尔值useHeadersIfPresent的重载构造函数之一来明确配置反序列化器以使用提供的目标类型并忽略标题中的类型信息(默认情况下为true)。

以下示例显示如何执行此操作:

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
    new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

你的代码:

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(),  new StringDeserializer(),
            new JsonDeserializer<>(Object.class,false));
}

现在可以在类级别上使用@KafkaListener

@KafkaListener(topics = "myTopic")
@Service
public class MultiListenerBean {

@KafkaHandler
public void listen(Cat cat) {
    ...
}

@KafkaHandler
public void listen(Hat hat) {
    ...
}

@KafkaHandler(isDefault = true)
public void delete(Object obj) {
    ...
   }

}

谢谢你,但我需要使用未指定类的反序列化器,因为我要反序列化不同类型的对象 - 字符串和myclass,在我的情况下。 - Usr
谢谢,现在它不再出现那个问题了,但我遇到了另一个错误。它告诉我处理时出错:ConsumerRecord - org.springframework.kafka.KafkaException: No method found for class java.util.LinkedHashMap。 - Usr
展示监听器类,你正在批处理吗?@TodorokiM - Ryuzaki L
只需添加另一个带有LinkedHashMap参数的@KafkaHandler方法,如果您为有效负载提供适当的POJO,则无论该主题中包含什么类型的有效负载,它都会正常工作。@TodorokiM - Ryuzaki L
解决了,我使用了ObjectMapper将LinkedHashMap转换为我的类对象。非常感谢! - Usr
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接