Kafka:ClassCastException: class org.apache.avro.generic.GenericData $ Record不能转换为class

8

我在消费者中遇到了这个异常,尝试将record.value()转换为Java对象:

ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast  to class [...].PublicActivityRecord (org.apache.avro.generic.GenericData$Record and [...].PublicActivityRecord are in unnamed module of loader 'app')

生产者以以下方式发送 Java 对象,该对象是名为 PublicActivityRecord 的用户定义类型:

KafkaProducer<String, PublicActivityRecord> producer = new KafkaProducer<>(createKafkaProperties());

[...]

    this.producer.send(new ProducerRecord<String, PublicActivityRecord>(myTopic, activityRecord));
    this.producer.flush();

当前我在调试模式下看到ProducerRecord的值确实是PublicActivityRecord类型。

在注册表服务器上,我可以看到生产者发送架构的POST请求的日志:

Registering new schema: subject DEV-INF_9325_activityRecord_01-value, version null, id null, type null, schema size 7294 (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:262)
[2022-01-28 07:01:35,575] INFO 192.168.36.30 - - [28/janv./2022:06:01:34 +0000] "POST /subjects/DEV-INF_9325_activityRecord_01-value/versions HTTP/1.1" 200 8 "-" "Java/11.0.2" POSTsT (io.confluent.rest-utils.requests:62)

在消费者方面:

protected KafkaConsumer<String, PublicActivityRecord> consumer;

[...]

consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Stream.of(kafkaConfig.getTopicActivityRecord()).collect(Collectors.toList()));
final ConsumerRecords<String, PublicActivityRecord> records = consumer.poll(duration);
records.forEach(record -> {

    [...]

    PublicActivityRecord activityRecord = record.value();

这里发生了ClassCastException。

在调试模式下,我可以看到record.value的类型确实是GenericData$Record。但它无法转换为PublicActivityRecord。

序列化/反序列化的键和值是相同的:

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

在模式注册表日志中,我可以看到消费者的GET请求:

"GET /schemas/ids/3?fetchMaxId=false HTTP/1.1" 200 8447 "-" "Java/11.0.7" GETsT (io.confluent.rest-utils.requests:62)

我已经检查过以下内容:
  1. 生产者使用我的类型PublicActivityRecord发送了一条消息
  2. kafka代理接收到了这条消息
  3. 生产者将模式发布到模式注册表中
  4. 消费者接收到了该条消息
  5. 消费者从模式注册表中获取到了该模式
  6. 该消息的值为意外的GenericData$Record
因此,我得出的结论是问题在于我的消费者。
所以问题是:为什么消费者得到的是GenericData记录而不是预期的PublicActivityRecord
非常感谢任何线索!
2个回答

18

默认情况下,只返回通用记录。您需要进行设置。

value.deserializer.specific.avro.reader=true 

或者,在您的消费者配置中使用该常量

KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG = true


1
谢谢你的提示,它对我有很大帮助。您需要在Spring Boot应用程序中按以下方式设置所提到的配置: spring: kafka: producer: value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer properties: specific: avro: reader: true - Dany Wehbe
3
@Dany 消费者反序列化程序的属性不应该被拆开。specific.avro.reader: true 应该在一行上。 - OneCricketeer

0
我们在使用Apache Flink中的AVRO反序列化Kafka记录时遇到了相同的错误。原因是我们重命名了AVRO工具生成的POJO对象的命名空间。因此,模式的命名空间和POJO对象的命名空间不一致。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接