Pyspark:将列中的JSON展开为多个列

43

数据看起来像这样 -

+-----------+-----------+-----------------------------+
|         id|      point|                         data|
+-----------------------------------------------------+
|        abc|          6|{"key1":"124", "key2": "345"}|
|        dfl|          7|{"key1":"777", "key2": "888"}|
|        4bd|          6|{"key1":"111", "key2": "788"}|

我正在尝试将其分解为以下格式。

+-----------+-----------+-----------+-----------+
|         id|      point|       key1|       key2|
+------------------------------------------------
|        abc|          6|        124|        345|
|        dfl|          7|        777|        888|
|        4bd|          6|        111|        788|
< p > explode函数将数据框拆分成多行。但这不是期望的解决方案。

注意:这个解决方案没有回答我的问题。PySpark "explode" dict in column

6个回答

56
只要您使用的是2.1或更高版本的Spark,pyspark.sql.functions.from_json 应该能为您提供所需的结果,但首先需要定义所需的 schema
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField('key1', StringType(), True),
        StructField('key2', StringType(), True)
    ]
)

df.withColumn("data", from_json("data", schema))\
    .select(col('id'), col('point'), col('data.*'))\
    .show()

这应该会给你

+---+-----+----+----+
| id|point|key1|key2|
+---+-----+----+----+
|abc|    6| 124| 345|
|df1|    7| 777| 888|
|4bd|    6| 111| 788|
+---+-----+----+----+

6
您可以使用以下代码从数据字段中提取JSON的模式:schema = spark.read.json(df.rdd.map(lambda row: row.data)).schema - Simon Peacock
1
有没有不提供模式的方法来实现这一点?在Spark流作业的上下文中,上述模式提取不是一个选项@SimonPeacock,写下完整的模式是..凌乱(至少可以这么说),而且也相当不灵活,因为我希望额外的字段出现而无需调整和重新启动整个流作业。 - denise
使用 df.schema 获取模式,不要忘记将所有数据类型都使用 StringType(),否则可能会对其他数据类型以及字符串类型产生 null 值。 - Ankit Agrawal
1
如果您想选择所有其余的DF列并扩展json列,请使用以下代码:df2 = df.select("*", col("data.*")) - Shrikant Prabhu

8

正如 @pault 建议的那样,数据字段是一个 string 字段。由于在 JSON 字符串中的行中键是相同的(即 'key1','key2'),因此您还可以使用 json_tuple()(根据文档,此函数是版本 1.6 中的新功能)。

from pyspark.sql import functions as F

df.select('id', 'point', F.json_tuple('data', 'key1', 'key2').alias('key1', 'key2')).show()

以下是我的原始帖子:,如果原始表来自df.show(truncate=False),那么它很可能是错误的,因此data字段不是Python数据结构。
由于您已将数据拆分为行,我假设列data是Python数据结构而不是字符串:
from pyspark.sql import functions as F

df.select('id', 'point', F.col('data').getItem('key1').alias('key1'), F.col('data')['key2'].alias('key2')).show()

我认为这在这种情况下不起作用 - 你需要一个MapType()列来使用getItem(),但是它看起来在这里是一个字符串。 - pault
原帖提到结果已经被拆分成多行,这似乎不是一个字符串字段。 - jxc
"explode函数将数据框拆分为多行。" 这句话听起来像是OP在陈述一个事实,而不是他们尝试过什么。此外,如果它是MapType(),它不会显示在帖子中所示的方式。 - pault
1
谢谢,我认为你可能是正确的。但当JSON字符串中的键是常量时,我认为它可以更简单。 - jxc
这个可行。我不知道 json_tuple - 它比定义模式容易多了。 - pault
我为循环挣扎了一天,这真的很酷。 - Arpit Sisodia

2
如@jxc所提到的,如果您无法预先定义模式且仅需要处理单个级别的json字符串,则json_tuple应该可以正常工作。我认为这更直接和易于使用。奇怪的是,我之前没有发现其他人提到过此功能。
在我的用例中,原始数据框架模式:StructType(List(StructField(a,StringType,true))),json字符串列显示为:
+---------------------------------------+
|a                                      |
+---------------------------------------+
|{"k1": "v1", "k2": "2", "k3": {"m": 1}}|
|{"k1": "v11", "k3": "v33"}             |
|{"k1": "v13", "k2": "23"}              |
+---------------------------------------+

使用json_tuple将json字段扩展为新列:

from pyspark.sql import functions as F

df = df.select(F.col('a'), 
    F.json_tuple(F.col('a'), 'k1', 'k2', 'k3') \
    .alias('k1', 'k2', 'k3'))

df.schema
df.show(truncate=False)

该文档并没有提供太多相关信息,但至少在我的使用场景中,通过 json_tuple 提取的新列是 StringType 类型,并且只能提取 JSON 字符串的单层深度。
StructType(List(StructField(k1,StringType,true),StructField(k2,StringType,true),StructField(k3,StringType,true)))

+---------------------------------------+---+----+-------+
|a                                      |k1 |k2  |k3     |
+---------------------------------------+---+----+-------+
|{"k1": "v1", "k2": "2", "k3": {"m": 1}}|v1 |2   |{"m":1}|
|{"k1": "v11", "k3": "v33"}             |v11|null|v33    |
|{"k1": "v13", "k2": "23"}              |v13|23  |null   |
+---------------------------------------+---+----+-------+

1
在这种方法中,您只需要设置具有Json内容的列名。无需设置模式。它会自动完成所有操作。
json_col_name = 'data'
keys = df.select(f"{json_col_name}.*").columns
jsonFields= [f"{json_col_name}.{key} {key}" for key in keys]

main_fields = [key for key in df.columns if key != json_col_name]
df_new = df.selectExpr(main_fields + jsonFields)

0

这适用于我的使用情况

data1 = spark.read.parquet(path)
json_schema = spark.read.json(data1.rdd.map(lambda row: row.json_col)).schema
data2 = data1.withColumn("data", from_json("json_col", json_schema))
col1 = data2.columns
col1.remove("data")
col2 = data2.select("data.*").columns
append_str ="data."
col3 = [append_str + val for val in col2]
col_list = col1 + col3
data3 = data2.select(*col_list).drop("json_col")

0

所有功劳归于Shrikant Prabhu

你可以简单地使用SQL

SELECT id, point, data.*
FROM original_table

这样,如果数据发生变化,新表的模式将会自适应,您无需在管道中进行任何操作。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接