以编程方式合并两个Avro模式

10

我有两个类似的架构,只有一个嵌套字段不同(在schema1中称为onefield,在schema2中称为anotherfield)。

schema1

{
    "type": "record",
    "name": "event",
    "namespace": "foo",
    "fields": [
        {
            "name": "metadata",
            "type": {
                "type": "record",
                "name": "event",
                "namespace": "foo.metadata",
                "fields": [
                    {
                        "name": "onefield",
                        "type": [
                            "null",
                            "string"
                        ],
                        "default": null
                    }
                ]
            },
            "default": null
        }
    ]
}

schema2

{
    "type": "record",
    "name": "event",
    "namespace": "foo",
    "fields": [
        {
            "name": "metadata",
            "type": {
                "type": "record",
                "name": "event",
                "namespace": "foo.metadata",
                "fields": [
                    {
                        "name": "anotherfield",
                        "type": [
                            "null",
                            "string"
                        ],
                        "default": null
                    }
                ]
            },
            "default": null
        }
    ]
}

我可以使用avro 1.8.0在程序上合并两个模式:
Schema s1 = new Schema.Parser().parse(schema1);
Schema s2 = new Schema.Parser().parse(schema2);
Schema[] schemas = {s1, s2};

Schema mergedSchema = null;
for (Schema schema: schemas) {
    mergedSchema = AvroStorageUtils.mergeSchema(mergedSchema, schema);
}

使用它将输入的json转换为avro或json表示:

JsonAvroConverter converter = new JsonAvroConverter();
try {
    byte[] example = new String("{}").getBytes("UTF-8");
    byte[] avro = converter.convertToAvro(example, mergedSchema);
    byte[] json = converter.convertToJson(avro, mergedSchema);
    System.out.println(new String(json));
} catch (AvroConversionException e) {
    e.printStackTrace();
}

这段代码展示了预期的输出:{"metadata":{"onefield":null,"anotherfield":null}}。问题在于我无法看到合并后的模式。如果我简单地执行System.out.println(mergedSchema),我会得到以下异常:

Exception in thread "main" org.apache.avro.SchemaParseException: Can't redefine: merged schema (generated by AvroStorage).merged
    at org.apache.avro.Schema$Names.put(Schema.java:1127)
    at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:561)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:689)
    at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:715)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:700)
    at org.apache.avro.Schema.toString(Schema.java:323)
    at org.apache.avro.Schema.toString(Schema.java:313)
    at java.lang.String.valueOf(String.java:2982)
    at java.lang.StringBuilder.append(StringBuilder.java:131)

我称之为Avro不确定性原理 :). 看起来Avro能够使用合并后的模式,但是在尝试将模式序列化为JSON时会失败。合并适用于更简单的模式,所以我认为这是Avro 1.8.0中的一个错误。
你知道可能发生了什么或如何解决它吗?任何解决方法(例如:替代Schema序列化器)都可以。

似乎在之前版本的 Avro (1.7.6) 中也发生了这种情况。http://mail-archives.apache.org/mod_mbox/avro-user/201406.mbox/%3C1402616127624-4030220.post@n3.nabble.com%3E - Guido
2个回答

2
我发现pig util类也存在同样的问题...实际上这里有两个错误:
  • AVRO允许使用无效模式通过GenericDatumWriter序列化数据
  • piggybank util类生成无效模式,因为它对所有合并字段都使用相同的名称/命名空间(而不是保留原始名称的实例)
在更复杂的场景下,https://github.com/kite-sdk/kite/blob/master/kite-data/kite-data-core/src/main/java/org/kitesdk/data/spi/SchemaUtil.java#L511可以正常工作。
    Schema mergedSchema = SchemaUtil.merge(s1, s2);

从你的例子中,我得到了以下的输出。
{
  "type": "record",
  "name": "event",
  "namespace": "foo",
  "fields": [
    {
      "name": "metadata",
      "type": {
        "type": "record",
        "name": "event",
        "namespace": "foo.metadata",
        "fields": [
          {
            "name": "onefield",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "anotherfield",
            "type": [
              "null",
              "string"
            ],
            "default": null
          }
        ]
      },
      "default": null
    }
  ]
}

希望这能帮到其他人。


谢谢@lake。我没有尝试过,但它看起来非常好。 - Guido

0
合并模式功能目前尚不支持Avro文件。 但是,假设您在一个目录中有多个具有不同模式的Avro文件,例如:/demo,那么您可以通过Spark读取它,并提供一个主模式文件(即.avsc文件),因此Spark将从文件中内部读取所有记录,如果任何一个文件缺少列,则会显示空值。
object AvroSchemaEvolution {
def main(args: Array[String]): Unit = {
val schema = new Schema.Parser().parse(new File("C:\\Users\\murtazaz\\Documents\\Avro_Schema_Evolution\\schema\\emp_inserted.avsc"))
val spark = SparkSession.builder().master("local").getOrCreate()
  val df = spark.read
.format("com.databricks.spark.avro").option("avroSchema", schema.toString)
.load("C:\\Users\\murtazaz\\Documents\\Avro_Schema_Evolution\\demo").show()
 }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接