Pyspark:JSON 转 Pyspark 数据框

5

我想将这个json转换为pyspark dataframe,我已经添加了我的当前代码。

json = {
"key1": 0.75,
"values":[
    {
        "id": 2313,
        "val1": 350,
        "val2": 6000
    },
    {
        "id": 2477,
        "val1": 340,
        "val2": 6500
    }
]

}

我的代码: 使用我的代码可以得到预期的输出。希望有人能够改进它。

import json
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()

json_string = json.dumps({
    "key1": 0.75,
    "values":[
        {
            "id": 2313,
            "val1": 350,
            "val2": 6000
        },
        {
            "id": 2477,
            "val1": 340,
            "val2": 6500
        }
    ]
})
df = spark.read.json(spark.sparkContext.parallelize([json_string]))

df = df.select("key1", "values.id", "values.val1", "values.val2")
df.show()

输出

+----+-------------+-------------+-------------+
|key1|           id|         val1|         val2|
+----+-------------+-------------+-------------+
|0.75| [2313, 2477]|   [350, 340]| [6000, 6500]|
+----+-------------+-------------+-------------+

帮忙感激得到期望的输出。

期望的输出:

+----+----+----+----+
|key1|  id|val1|val2|
+----+----+----+----+
|0.75|2313| 350|6000|
|0.75|2477| 340|6500|
+----+----+----+----+

2个回答

4
你可以尝试使用 Spark inline 函数。
df = df.selectExpr("key1", "inline(values)")

它运行顺畅,谢谢你的答案。这真的很有帮助,因为我有大量的列。 - Leonard

0

如果您不想使用 explode 来完成此操作,您可以使用 pandas 作为中间处理器

import json
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()

d = {
    "key1": 0.75,
    "values":[
        {
            "id": 2313,
            "val1": 350,
            "val2": 6000
        },
        {
            "id": 2477,
            "val1": 340,
            "val2": 6500
        }
    ]
}
# We need to put this data into columnar format for pandas
df_dict = {
    'key1': [d['key1'] for _ in range(len(d['values']))],
    'id': [x['id'] for x in d['values']],
    'val1': [x['val1'] for x in d['values']],
    'val2': [x['val2'] for x in d['values']],
}

pdf = pd.DataFrame.from_dict(df_dict)

df = spark.createDataFrame(pdf)
df.show()

+----+----+----+----+
|key1|  id|val1|val2|
+----+----+----+----+
|0.75|2313| 350|6000|
|0.75|2477| 340|6500|
+----+----+----+----+

一种无需使用Pandas和Explode的替代方案

d_list =[
    {
        'key1': d['key1'], 
        'id': d['values'][k]['id'], 
        'val1':d['values'][k]['val1'], 
        'val2':d['values'][k]['val2']
    } for k in range(len(d['values']))
]
json_string = json.dumps(d_list)
df = spark.read.json(spark.sparkContext.parallelize([json_string]))
df.show()

+----+----+----+----+
|  id|key1|val1|val2|
+----+----+----+----+
|2313|0.75| 350|6000|
|2477|0.75| 340|6500|
+----+----+----+----+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接