Pyspark:从包含数组列的Json Schema创建模式

6

我已经在一个json文件中定义了df的架构,如下所示:

{
    "table1":{
        "fields":[
            {"metadata":{}, "name":"first_name", "type":"string", "nullable":false},
            {"metadata":{}, "name":"last_name", "type":"string", "nullable":false},
            {"metadata":{}, "name":"subjects", "type":"array","items":{"type":["string", "string"]}, "nullable":false},
            {"metadata":{}, "name":"marks", "type":"array","items":{"type":["integer", "integer"]}, "nullable":false},
            {"metadata":{}, "name":"dept", "type":"string", "nullable":false}       
        ]
    }

}

例子 JSON 数据:

{
    "table1": [
        {
            "first_name":"john",
            "last_name":"doe",
            "subjects":["maths","science"],
            "marks":[90,67],
            "dept":"abc"        
        },
        {
            "first_name":"dan",
            "last_name":"steyn",
            "subjects":["maths","science"],
            "marks":[90,67],
            "dept":"abc"        
        },
        {
            "first_name":"rose",
            "last_name":"wayne",
            "subjects":["maths","science"],
            "marks":[90,67],
            "dept":"abc"            
        },
        {
            "first_name":"nat",
            "last_name":"lee",
            "subjects":["maths","science"],
            "marks":[90,67],
            "dept":"abc"        
        },
        {
            "first_name":"jim",
            "last_name":"lim",
            "subjects":["maths","science"],
            "marks":[90,67],
            "dept":"abc"        
        }       
    ]
}

我想根据这个json文件创建相应的Spark模式。以下是我的代码:(参考:Create spark dataframe schema from json schema representation))
with open(schemaFile) as s:
 schema = json.load(s)["table1"]
 source_schema = StructType.fromJson(schema)

如果我的模式中有数组列,则上述代码在没有任何数组列的情况下可以正常工作。但是,如果我在模式中有数组列,则会抛出以下错误。
"无法解析数据类型:array" (“无法解析数据类型:%s” json_value)

1
你尝试过倒着做吗?你可以创建一个包括数组的Python对象模式,然后将其转换为JSON格式,看看有什么不同。 - Steven
提供的模式无效,"items":{"type":["string", "string"]}后面缺少一个逗号。我认为最好发布您的实际数据或尝试在Spark中加载json,然后导出由Spark创建的模式。 - abiratsis
@AlexandrosBiratsis:模式已更新。我的实际数据是一个csv文件。我正在尝试将此模式包含在具有多个模式的json文件中,并在spark中读取csv文件时,将引用此json文件以获取正确的模式,以提供正确的列标题和数据类型。 - blackfury
是的,我看到了 @blackfury 尽管您的架构再次无效! "items":{"type":["string", "string"]} 不是有效的定义,您到底想说什么?你能发布一些实际的JSON数据吗? - abiratsis
@AlexandrosBiratsis:添加了一个示例JSON数据。 - blackfury
1个回答

11

在您的情况下,数组的表示存在问题。正确的语法是:

{ "metadata": {}, "name": "marks", "nullable": true, "type": {"containsNull": true, "elementType": "long", "type": "array" } }.

为了从JSON中检索模式,您可以编写以下Pyspark代码片段:

jsonData = """{
    "table1": [{
            "first_name": "john",
            "last_name": "doe",
            "subjects": ["maths", "science"],
            "marks": [90, 67],
            "dept": "abc"
        },
        {
            "first_name": "dan",
            "last_name": "steyn",
            "subjects": ["maths", "science"],
            "marks": [90, 67],
            "dept": "abc"
        },
        {
            "first_name": "rose",
            "last_name": "wayne",
            "subjects": ["maths", "science"],
            "marks": [90, 67],
            "dept": "abc"
        },
        {
            "first_name": "nat",
            "last_name": "lee",
            "subjects": ["maths", "science"],
            "marks": [90, 67],
            "dept": "abc"
        },
        {
            "first_name": "jim",
            "last_name": "lim",
            "subjects": ["maths", "science"],
            "marks": [90, 67],
            "dept": "abc"
        }
    ]
}"""

df = spark.read.json(sc.parallelize([jsonData]))

df.schema.json()

这应该输出:

{
    "fields": [{
        "metadata": {},
        "name": "table1",
        "nullable": true,
        "type": {
            "containsNull": true,
            "elementType": {
                "fields": [{
                    "metadata": {},
                    "name": "dept",
                    "nullable": true,
                    "type": "string"
                }, {
                    "metadata": {},
                    "name": "first_name",
                    "nullable": true,
                    "type": "string"
                }, {
                    "metadata": {},
                    "name": "last_name",
                    "nullable": true,
                    "type": "string"
                }, {
                    "metadata": {},
                    "name": "marks",
                    "nullable": true,
                    "type": {
                        "containsNull": true,
                        "elementType": "long",
                        "type": "array"
                    }
                }, {
                    "metadata": {},
                    "name": "subjects",
                    "nullable": true,
                    "type": {
                        "containsNull": true,
                        "elementType": "string",
                        "type": "array"
                    }
                }],
                "type": "struct"
            },
            "type": "array"
        }
    }],
    "type": "struct"
}

或者,您可以使用df.schema.simpleString(),它将返回一个相对较简单的模式格式:

struct<table1:array<struct<dept:string,first_name:string,last_name:string,marks:array<bigint>,subjects:array<string>>>>

最后,您可以将上面的模式存储到文件中,并使用以下方法稍后加载:

import json
new_schema = StructType.fromJson(json.loads(schema_json))

就像您已经做过的那样。 记住,您也可以动态地为任何JSON数据实现所描述的过程。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接