使用Spark DataFrame演化模式

3
我正在处理一个Spark数据框架,它可以从三个不同的模式版本中加载数据。
// Original
{ "A": {"B": 1 } }
// Addition "C"
{ "A": {"B": 1 }, "C": 2 }
// Additional "A.D"
{ "A": {"B": 1, "D": 3 }, "C": 2 }

我可以通过检查模式是否包含字段"C",如果不包含,则为数据框添加新列来处理额外的"C"。然而,我无法想出如何为子对象创建字段。

public void evolvingSchema() {
    String versionOne = "{ \"A\": {\"B\": 1 } }";
    String versionTwo = "{ \"A\": {\"B\": 1 }, \"C\": 2 }";
    String versionThree = "{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }";

    process(spark.getContext(), "1", versionOne);
    process(spark.getContext(), "2", versionTwo);
    process(spark.getContext(), "2", versionThree);
}

private static void process(JavaSparkContext sc, String version, String data) {
    SQLContext sqlContext = new SQLContext(sc);
    DataFrame df = sqlContext.read().json(sc.parallelize(Arrays.asList(data)));
    if(!Arrays.asList(df.schema().fieldNames()).contains("C")) {
        df = df.withColumn("C", org.apache.spark.sql.functions.lit(null));
    }
    // Not sure what to put here. The fieldNames does not contain the "A.D"

    try {
        df.select("C").collect();
    } catch(Exception e) {
        System.out.println("Failed to C for " + version);
    }
    try {
        df.select("A.D").collect();
    } catch(Exception e) {
        System.out.println("Failed to A.D for " + version);
    }
}
2个回答

7

JSON数据源不太适用于有不断演化的模式的数据(考虑使用Avro或Parquet),但简单的解决方案是对所有数据源使用相同的模式,并使新字段可选/可空:

import org.apache.spark.sql.types.{StructType, StructField, LongType}

val schema = StructType(Seq(
  StructField("A", StructType(Seq(
    StructField("B", LongType, true), 
    StructField("D", LongType, true)
  )), true),
  StructField("C", LongType, true)))

您可以像这样向DataFrameReader传递schema
val rddV1 = sc.parallelize(Seq("{ \"A\": {\"B\": 1 } }"))
val df1 = sqlContext.read.schema(schema).json(rddV1)

val rddV2 = sc.parallelize(Seq("{ \"A\": {\"B\": 1 }, \"C\": 2 }"))
val df2 = sqlContext.read.schema(schema).json(rddV2)

val rddV3 = sc.parallelize(Seq("{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }"))
val df3 = sqlContext.read.schema(schema).json(rddV3)

这样,您将获得一个结构一致的内容,不受变量影响:

require(df1.schema == df2.schema && df2.schema == df3.schema)

确保缺失的列自动设置为null

df1.printSchema
// root
//  |-- A: struct (nullable = true)
//  |    |-- B: long (nullable = true)
//  |    |-- D: long (nullable = true)
//  |-- C: long (nullable = true)

df1.show
// +--------+----+
// |       A|   C|
// +--------+----+
// |[1,null]|null|
// +--------+----+

df2.show
// +--------+---+
// |       A|  C|
// +--------+---+
// |[1,null]|  2|
// +--------+---+

df3.show
// +-----+---+
// |    A|  C|
// +-----+---+
// |[1,3]|  2|
// +-----+---+

注意:

此解决方案取决于数据源。它可能适用于其他数据源,也可能不适用,甚至可能导致格式错误的记录。了解更多


1
@mlk,我们在演化模式和JSON方面的主要问题是各种JSON客户端可能会做出意外的事情,比如将你期望为数组的空部分呈现为空字符串(即“”)。这可能会严重破坏你的模式管理...我想zero323也有类似的担忧。 - Ewan Leith
好的,谢谢Ewan。有没有一种简单的方法可以从avro模式转换为StructType - Michael Lloyd Lee mlk
1
除了Ewan的评论之外,JSON既不是自我描述的,也不支持架构。当然,您可以使用或创建自定义超媒体格式来处理JSON文档,但它们不是语义的一部分。当我们处理JSONL时,这变得特别麻烦。如果不经过整个文件,就无法推断出模式。此外,如果数据格式错误,运行时会出现错误。 - zero323
1
就目前而言,如果您读取两个不同分区parquet文件夹,则此操作无法与Spark 2.1.0兼容。即使您在读取之前指定了模式,列顺序也会混乱。 - Michel Lemay
1
参考我的处理进化模式的方式是这样的:我使用私有的 Structype.mergeSchema 暴露并使用手动合并来自不同来源(从 ParquetFileFormat.mergeSchemasInParallel 读取一些文件的子集)的架构,然后我读取数据帧并给出显式的架构,最后使用 select(col: _*) 技巧重新排序列。只有在这之后,我才执行联合操作。 - Michel Lemay
显示剩余5条评论

3

zero323已经回答了这个问题,不过是用Scala。以下是同样的解决方法,但使用Java。

public void evolvingSchema() {
    String versionOne = "{ \"A\": {\"B\": 1 } }";
    String versionTwo = "{ \"A\": {\"B\": 1 }, \"C\": 2 }";
    String versionThree = "{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }";

    process(spark.getContext(), "1", versionOne);
    process(spark.getContext(), "2", versionTwo);
    process(spark.getContext(), "2", versionThree);
}

private static void process(JavaSparkContext sc, String version, String data) {
    StructType schema = DataTypes.createStructType(Arrays.asList(
            DataTypes.createStructField("A",
                    DataTypes.createStructType(Arrays.asList(
                            DataTypes.createStructField("B", DataTypes.LongType, true),
                    DataTypes.createStructField("D", DataTypes.LongType, true))), true),
            DataTypes.createStructField("C", DataTypes.LongType, true)));

    SQLContext sqlContext = new SQLContext(sc);
    DataFrame df = sqlContext.read().schema(schema).json(sc.parallelize(Arrays.asList(data)));

    try {
        df.select("C").collect();
    } catch(Exception e) {
        System.out.println("Failed to C for " + version);
    }
    try {
        df.select("A.D").collect();
    } catch(Exception e) {
        System.out.println("Failed to A.D for " + version);
    }
}

谢谢。Java API与更常见的Scala API大不相同。 - Manuel G
谢谢,对Java感兴趣的开发者相对较少。 - Amit Kumar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接