使用Avro和Flink如何解码Kafka消息

3
我正在尝试使用Flink 1.0.3从Kafka主题中读取AVRO数据。 我只知道这个特定的Kafka主题有AVRO编码的消息,而我有AVRO模式文件。
“我的Flink代码:”
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");
        properties.setProperty("zookeeper.connect", "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");
        properties.setProperty("group.id", "Zeeshantest");
        AvroDeserializationSchema<Event> avroSchema = new AvroDeserializationSchema<>(Event.class);
        FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
        DataStream<Event> messageStream = env.addSource(kafkaConsumer);
        messageStream.rebalance().print();
        env.execute("Flink AVRO KAFKA Test");
    }

我使用 Avro 工具和模式 "rocana.avsc" 创建了我的 Event.java 文件。
java -jar /path/to/avro-tools-1.8.1.jar compile schema rocana.avsc

这是在 Github 上上传的 rocana.avsc 文件。

AvroDeserializationSchema.java

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

    private static final long serialVersionUID = 4330538776656642778L;

    private final Class<T> avroType;
    private transient DatumReader<T> reader;
    private transient BinaryDecoder decoder;

    public AvroDeserializationSchema(Class<T> avroType) {
        this.avroType = avroType;
    }

    @Override
    public T deserialize(byte[] message) {
        ensureInitialized();
        try {
            decoder = DecoderFactory.get().binaryDecoder(message, decoder);
            return reader.read(null, decoder);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean isEndOfStream(T nextElement) {
        return false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(avroType);
    }

    private void ensureInitialized() {
        if (reader == null) {
            if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
                reader = new SpecificDatumReader<T>(avroType);
            } else {
                reader = new ReflectDatumReader<T>(avroType);
            }
        }
    }
}

运行我的程序时,出现以下错误:
17:25:30,759 INFO  org.apache.zookeeper.ZooKeeper                                - Session: 0x356350cb9001857 closed
17:25:30,759 INFO  org.apache.zookeeper.ClientCnxn                               - EventThread shut down
17:25:30,761 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (3/4) switched to FAILED with exception.
java.lang.Exception: 2
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
    at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
    at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
    at com.google.cloud.dataflow.sdk.coders.AvroCoder.decode(AvroCoder.java:274)
    at org.fmr.flink.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:52)
    at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39)
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657)
17:25:30,769 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (3/4)
17:25:30,776 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Unnamed (1/4) switched

我认为我的反序列化代码不正确。有人知道我做错了什么吗?这是使用Flink从Kafka读取AVRO数据的方式,还是有更好的方法?


你确定 Kafka 中的数据是使用完全相同版本的模式进行编码的吗?通过创建一个 new SpecificDatumReader<T>(avroType),你告诉数据读取器 avroType 的模式既是读取器又是写入器的模式。如果实际上使用了不同版本的模式来编码消息,则可能会出现这些异常。 - Josh
是的,模式文件是正确的,我在logstash中使用了相同的“模式文件”和“kafka主题”,它完美地工作了。 - Zeeshan
2个回答

3

我所发布的代码与我的问题完全匹配,运行良好。

问题出在发送到kafka主题的数据上,发送了JSON和AVRO格式的数据。当我订阅一个只包含AVRO数据的不同Kafka主题时,我的代码就能正常工作了。


2
尝试以下代码来反序列化Avro记录:
Schema a; //Your Avro schema
DatumReader<GenericData.Record> reader = new GenericDatumReader<GenericData.Record>(a);
GenericData.Record a = reader.read(null, DecoderFactory.get().binaryDecoder(bytes, null));

不行,还是有同样的错误。通过以下方式创建模式:Schema schema = new Schema.Parser().parse(new File("./src/main/resources/rocana.avsc")); - Zeeshan
如果数据只是Avro编码,这应该可以工作。您能否请检查一下您是如何将数据注入Kafka的?如果您正在使用某个库,请阅读相同编码器。 - Garry
3
问题出在发送到Kafka主题的数据上,其中包括JSON和AVRO格式的数据。我订阅了另一个只包含AVRO格式数据的Kafka主题,我的代码工作正常。 - Zeeshan
@Zeeshan,既然你已经解决了问题,能否请你回答自己的问题呢?谢谢! - Jacek Laskowski

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接