将普通Java对象转换为Avro记录的通用方法

21
我正在寻找一种通用的方式将POJO转换为avro对象。实现应该能够在POJO类的任何更改下保持稳健。我已经通过显式填充avro记录(见下面的示例)来实现了它。
有没有一种方法可以摆脱硬编码的字段名称,只需从对象中填充avro记录?反射是唯一的方式吗,还是avro提供了这个功能?
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.reflect.ReflectData;

public class PojoToAvroExample {

    static class PojoParent {
        public final Map<String, String> aMap = new HashMap<String, String>();
        public final Map<String, Integer> anotherMap = new HashMap<String, Integer>();
    }

    static class Pojo extends PojoParent {
        public String uid;
        public Date eventTime;
    }

    static Pojo createPojo() {
        Pojo foo = new Pojo();
        foo.uid = "123";
        foo.eventTime = new Date();
        foo.aMap.put("key", "val");
        foo.anotherMap.put("key", 42);
        return foo;
    }

    public static void main(String[] args) {
        // extract the avro schema corresponding to Pojo class
        Schema schema = ReflectData.get().getSchema(Pojo.class);
        System.out.println("extracted avro schema: " + schema);
        // create avro record corresponding to schema
        Record avroRecord = new Record(schema);
        System.out.println("corresponding empty avro record: " + avroRecord);

        Pojo foo = createPojo();
        // TODO: to be replaced by generic variant:
        // something like avroRecord.importValuesFrom(foo);
        avroRecord.put("uid", foo.uid);
        avroRecord.put("eventTime", foo.eventTime);
        avroRecord.put("aMap", foo.aMap);
        avroRecord.put("anotherMap", foo.anotherMap);
        System.out.println("expected avro record: " + avroRecord);
    }
}

2
为什么不使用Avro的ReflectDatumWriter来序列化POJO呢? - Chin Huang
我在Hadoop环境中使用Avro。为了序列化,我想使用AvroParquetOutputFormat。 - Fabian Braun
6个回答

13

您是否正在使用Spring?

我使用了Spring的一个特性来构建一个mapper。但是也可以通过原始的反射工具构建这样的mapper:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectData;
import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.util.Assert;

public class GenericRecordMapper {

    public static GenericData.Record mapObjectToRecord(Object object) {
        Assert.notNull(object, "object must not be null");
        final Schema schema = ReflectData.get().getSchema(object.getClass());
        final GenericData.Record record = new GenericData.Record(schema);
        schema.getFields().forEach(r -> record.put(r.name(), PropertyAccessorFactory.forDirectFieldAccess(object).getPropertyValue(r.name())));
        return record;
    }

    public static <T> T mapRecordToObject(GenericData.Record record, T object) {
        Assert.notNull(record, "record must not be null");
        Assert.notNull(object, "object must not be null");
        final Schema schema = ReflectData.get().getSchema(object.getClass());
        Assert.isTrue(schema.getFields().equals(record.getSchema().getFields()), "Schema fields didn't match");
        record.getSchema().getFields().forEach(d -> PropertyAccessorFactory.forDirectFieldAccess(object).setPropertyValue(d.name(), record.get(d.name()) == null ? record.get(d.name()) : record.get(d.name()).toString()));
        return object;
    }

}

使用此映射器,您可以生成一个通用的GenericData.Record,该记录可以轻松序列化为Avro格式。当您反序列化Avro ByteArray时,可以使用它来从反序列化记录中重建POJO:
序列化
byte[] serialized = avroSerializer.serialize("topic", GenericRecordMapper.mapObjectToRecord(yourPojo));

反序列化
GenericData.Record deserialized = (GenericData.Record) avroDeserializer.deserialize("topic", serialized);

YourPojo yourPojo = GenericRecordMapper.mapRecordToObject(deserialized, new YourPojo());

2
不错,但似乎不能正确处理列表和集合,例如List<String>被翻译为包含所有元素的一个字符串的列表,即列表的toString方法。 - Chris W.
消费者和生产者端具有不同的Avro命名空间的消息,这个能够正常工作吗?前提是其他方面都相同。 - Harsh Mishra
这也无法处理嵌套或递归对象。 - OneCricketeer

8
这里是一种将内容转换的通用方法。
public static <V> byte[] toBytesGeneric(final V v, final Class<V> cls) {
        final ByteArrayOutputStream bout = new ByteArrayOutputStream();
        final Schema schema = ReflectData.get().getSchema(cls);
        final DatumWriter<V> writer = new ReflectDatumWriter<V>(schema);
        final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(bout, null);
        try {
            writer.write(v, binEncoder);
            binEncoder.flush();
        } catch (final Exception e) {
            throw new RuntimeException(e);
        }


        return bout.toByteArray();
    }

public static void main(String[] args) {
    PojoClass pojoObject = new PojoClass();
    toBytesGeneric(pojoObject, PojoClass.class);
}

4
使用jackson/avro,将pojo转换为byte[]非常容易,类似于jackson/json:
byte[] avroData = avroMapper.writer(schema).writeValueAsBytes(pojo);

p.s.
Jackson 不仅处理 JSON,还可以处理 XML/Avro/Protobuf/YAML 等格式,使用的类和 API 非常相似。


将其发送为byte[]后,客户端如何将byte[]转换回POJO?客户端需要提前了解POJO信息吗?还是可以从jackson/avro的AvroSchema构造POJO? - user2441441

2

将任何普通Java对象类转换为Avro通用记录的两个步骤

  1. 使用Jackson/Avro将POJO转换为字节,并使用Avro Mapper进行操作。

  2. 使用Avro GenericDatumReader将其读取为通用记录。

public class AvroConverter{

 public static GenericRecord convertToGenericRecord(String schemaPath, SomeClass someObject){
  Schema schema = new Schema.Parser().setValidate(true).parse(new ClassPathResource(schemaPath).getFile());
  AvroSchema avSchema = new AvroSchema(schema);
  ObjectWritter writter = new AvroMapper().writer(avSchema);
  final byte[] bytes = writter.writeValueAsBytes(someObject);
  GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(avSchema);
  return (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes, null));
 }

}

Gradle依赖

 // https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-avro
    implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-avro'


这段代码的作用是什么?ObjectWritter writter 没有被使用(而且拼写错误),GenericDatumReader 需要一个 Schema 参数而不是提供的 AvroSchema 参数。这段代码甚至无法编译。 - filpa
@filpa 感谢您指出。avroMapper 用于将对象写入字节中。我有一些打字错误。希望现在清楚了。 - Sivaram Rasathurai

0
除了我的评论回复@TranceMaster的答案之外,下面修改后的版本对于基本类型和Java集合也适用:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectData;
import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.util.Assert;

public class GenericRecordMapper {

    public static GenericData.Record mapObjectToRecord(Object object) {
        Assert.notNull(object, "object must not be null");
        final Schema schema = ReflectData.get().getSchema(object.getClass());
        System.out.println(schema);
        final GenericData.Record record = new GenericData.Record(schema);
        schema.getFields().forEach(r -> record.put(r.name(), PropertyAccessorFactory.forDirectFieldAccess(object).getPropertyValue(r.name())));
        return record;
    }

    public static <T> T mapRecordToObject(GenericData.Record record, T object) {
        Assert.notNull(record, "record must not be null");
        Assert.notNull(object, "object must not be null");

        final Schema schema = ReflectData.get().getSchema(object.getClass());
        Assert.isTrue(schema.getFields().equals(record.getSchema().getFields()), "Schema fields didn't match");

        record
                .getSchema()
                .getFields()
                .forEach(field ->
                    PropertyAccessorFactory
                            .forDirectFieldAccess(object)
                            .setPropertyValue(field.name(), record.get(field.name()))
                );
        return object;
    }
}

-1

我自己也需要这样的东西。你需要的库在avro JAR文件中,但奇怪的是,似乎没有办法从avro-tools命令行中调用它。

可以这样调用:java GenerateSchemaFromPOJO com.example.pojo.Person Person.java

import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;

import org.apache.avro.Schema;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.avro.AvroFactory;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaGenerator;
import com.fasterxml.jackson.dataformat.avro.schema.VisitorFormatWrapperImpl;

public class GenerateSchemaFromPOJO {

    public static void main(String[] args) {
        String className  = null;
        String outputFile = null;
        Writer outputWriter = null;
        try {
            if(args.length != 2) {
                System.out.println("Usage: java " + GenerateSchemaFromPOJO.class.getCanonicalName() + " classname output-schema-file.json");
                System.exit(1);
            }
            className = args[0];
            outputFile = args[1];

            Class<?> clazz = Class.forName(className);

            AvroFactory avroFactory = new AvroFactory();
            ObjectMapper mapper = new ObjectMapper(avroFactory);

            AvroSchemaGenerator gen = new AvroSchemaGenerator();
            mapper.acceptJsonFormatVisitor(clazz, gen);
            AvroSchema schemaWrapper = gen.getGeneratedSchema();

            Schema avroSchema = schemaWrapper.getAvroSchema();
            String asJson = avroSchema.toString(true);

            outputWriter = new FileWriter(outputFile);
            outputWriter.write(asJson);
        } catch (Exception ex) {
            System.err.println("caught " + ex);
            ex.printStackTrace();
            System.exit(1);
        } finally {
            if(outputWriter != null) {
                try {
                    outputWriter.close();
                } catch (IOException e) {
                    System.err.println("Caught " + e + " while trying to close outputWriter to " + outputFile);;
                    e.printStackTrace();
                }
            }
        }
    }
}

根据您的回答,我理解您的代码为给定的“clazz”生成avro模式。这不是我在问题中询问的内容。我在以下行中执行相同操作:ReflectData.get().getSchema(Pojo.class);。我正在寻找一种用通用变量替换avroRecord.put(..., ...);的方法。 - Fabian Braun

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接