使用Pyspark将Json数据扁平化

5
my_data=[
    {'stationCode': 'NB001',
       'summaries': [{'period': {'year': 2017}, 'rainfall': 449},
        {'period': {'year': 2018}, 'rainfall': 352.4},
        {'period': {'year': 2019}, 'rainfall': 253.2},
        {'period': {'year': 2020}, 'rainfall': 283},
        {'period': {'year': 2021}, 'rainfall': 104.2}]},
    {'stationCode': 'NA003',
       'summaries': [{'period': {'year': 2019}, 'rainfall': 58.2},
        {'period': {'year': 2020}, 'rainfall': 628.2},
        {'period': {'year': 2021}, 'rainfall': 120}]}]

在Pandas中,我可以:
import pandas as pd
from pandas import json_normalize
pd.concat([json_normalize(entry, 'summaries', 'stationCode') 
                     for entry in my_data])

这将会给我生成以下表格:
    rainfall  period.year stationCode
0     449.0         2017       NB001
1     352.4         2018       NB001
2     253.2         2019       NB001
3     283.0         2020       NB001
4     104.2         2021       NB001
0      58.2         2019       NA003
1     628.2         2020       NA003
2     120.0         2021       NA003

在pyspark中能否用一行代码实现这个功能?

我已经尝试了下面的代码,它给出了相同的结果。但是,它太长了,有没有缩短它的方法?

df=sc.parallelize(my_data)
df1=spark.read.json(df)


  df1.select("stationCode","summaries.period.year","summaries.rainfall").display()
  df1 = df1.withColumn("year_rainfall", F.arrays_zip("year", "rainfall"))
           .withColumn("year_rainfall", F.explode("year_rainfall"))
           .select("stationCode", 
               F.col("year_rainfall.rainfall").alias("Rainfall"), 
               F.col("year_rainfall.year").alias("Year"))
  df1.display(20, False)

我正在了解pyspark,欢迎提供相关说明或好的信息来源。

2个回答

2
考虑一个包含以下数据的示例json文件。
{
   "Name": "TestName",
   "Date": "2021-04-09",
   "Readings": [
      {
        "Id": 1,
        "Reading": 5.678,
        "datetime": "2021-04-09 00:00:00"
     },
     {
        "Id": 2,
        "Reading": 3.692,
        "datetime": "2020-04-09 00:00:00"
     }
  ]
}

定义一个模式,我们可以强制读取我们的数据。

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType

data_schema = StructType(fields=[
   StructField('Name', StringType(), False),
   StructField('Date', StringType(), True),
   StructField(
       'Readings', ArrayType(
          StructType([
             StructField('Id', IntegerType(), False),
             StructField('Reading', DoubleType(), True),
             StructField('datetime', StringType(), True)
          ])
       )
    )
])

现在我们可以使用模式来读取目录中的JSON文件。
data_df = spark.read.json('/mnt/data/' + '*.json', schema=data_schema)

我们需要嵌套在“读数”中的数据,以便我们可以使用explode获取这些子列。
from pyspark.sql.functions import explode

data_df = data_df.select(
    "Name",
    explode("Readings").alias("ReadingsExplode")
).select("Name", "ReadingsExplode.*")

data_df.show()

这应该可以通过展平数据框来提供所需的输出。

2
你的内容看起来很好,易于阅读。不过你也可以直接压缩和解压:
out = (df1.select("stationCode", 
      F.explode(F.arrays_zip(*["summaries.period.year","summaries.rainfall"])))
.select("stationCode",F.col("col")['0'].alias("year"),F.col("col")['1'].alias("rainfall")))

out.show()

+-----------+----+--------+
|stationCode|year|rainfall|
+-----------+----+--------+
|      NB001|2017|   449.0|
|      NB001|2018|   352.4|
|      NB001|2019|   253.2|
|      NB001|2020|   283.0|
|      NB001|2021|   104.2|
|      NA003|2019|    58.2|
|      NA003|2020|   628.2|
|      NA003|2021|   120.0|
+-----------+----+--------+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接