Spark SQL将数组扩展为多个列

3

我正在将来自Oracle源的每行更新的JSON消息存储在S3中。 JSON结构如下:

{
    "tableName": "ORDER",
    "action": "UPDATE",
    "timeStamp": "2016-09-04 20:05:08.000000",
    "uniqueIdentifier": "31200477027942016-09-05 20:05:08.000000",
    "columnList": [{
        "columnName": "ORDER_NO",
        "newValue": "31033045",
        "oldValue": ""
    }, {
        "columnName": "ORDER_TYPE",
        "newValue": "N/B",
        "oldValue": ""
    }]
}

我正在使用Spark SQL根据唯一标识的最大值,查找每个键的最新记录。columnList是表格列的列表数组。我想连接多个表格,并获取最新的记录。如何从一个表格的JSON数组中加入列到另一个表格的列中?有没有一种方法可以将JSON数组展开成多个列?例如上面的JSON将拥有一个ORDER_NO列和一个ORDER_TYPE列。如何基于columnName字段创建一个具有多个列的数据框架?例如:新的RDD应该包含列(tableName、action、timeStamp、uniqueIdentifier、ORDER_NO、ORDER_NO)。ORDER_NO和ORDER_NO字段的值应该从JSON的newValue字段映射过来。
1个回答

0
已经通过使用RDD API以编程方式创建模式找到了此问题的解决方案。
  Dataset<Row> dataFrame = spark.read().json(inputPath);
    dataFrame.printSchema();
    JavaRDD<Row> rdd = dataFrame.toJavaRDD();
    SchemaBuilder schemaBuilder = new SchemaBuilder();
    // get the schema column names in appended format
    String columnNames = schemaBuilder.populateColumnSchema(rdd.first(), dataFrame.columns());

SchemaBuilder是一个自定义类,它接收RDD细节并返回一个分隔符分隔的列名。 然后使用RowFactory.create调用,将JSON值映射到模式中。 文档参考http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接