Pyspark中如何删除结构列中的字段

3
我想从一个结构体的值中删除一部分,并将该版本的值保存为我的数据框中的新列,它看起来像这样:
column
{"A": "2022-01-26T14:21:32.214+0000", "B": 69, "C": {"CA": 42, "CB": "Hello"}, "D": "XD"}
我想删除字段 C 及其值,并将其余部分保存为一个新列,而不是将 ABD 字段分成不同的列。我想要的结果应该像这样:
column newColumn
{"A": "2022-01-26T14:21:32.214+0000", "B": 69, "C": {"CA": 42, "CB": "Hello"}, "D": "XD"} {"A": "2022-01-26T14:21:32.214+0000", "B": 69, "D": "XD"}
我已经成功地通过将我的数据框转换为一个字典来删除了 C,但现在我无法将其转换回一个列。我的删除 C 的尝试如下:
dfTemp = df.select('column').collect()[0][0].asDict(True)
dfDict = {}
for k in dfTemp:
    if k != 'C':
        dfDict[k] = dfTemp[k]

如果您有更好的方法可以删除类似于我的结构中的一部分,并将结果保留在一列中而不是添加更多行,或者如果您知道如何将字典转换为数据框而不将键和值对分成单独的列,请提出建议。

2个回答

2
假设您的列是字符串类型且包含 JSON 数据,您可以使用 from_json 方法将其首先解析为 StructType 类型,代码如下:
df = spark.createDataFrame([
    ('{"A": "2022-01-26T14:21:32.214+0000", "B": 69, "C": {"CA": 42, "CB": "Hello"}, "D": "XD"}',)
], ["column"])

df = df.withColumn(
    "parsed_column",
    F.from_json("column", "struct<A:string,B:int,C:struct<A:int,CB:string>,D:string>")
)

现在需要从结构化列中删除字段 C:

Spark >=3.1

使用dropFields方法:

result = df.withColumn("newColumn", F.to_json(F.col("parsed_column").dropFields("C"))).drop("parsed_column")

result.show(truncate=False)

#+-----------------------------------------------------------------------------------------+----------------------------------------------------+
#|column                                                                                   |newColumn                                           |
#+-----------------------------------------------------------------------------------------+----------------------------------------------------+
#|{"A": "2022-01-26T14:21:32.214+0000", "B": 69, "C": {"CA": 42, "CB": "Hello"}, "D": "XD"}|{"A":"2022-01-26T14:21:32.214+0000","B":69,"D":"XD"}|
#+-----------------------------------------------------------------------------------------+----------------------------------------------------+

Spark <3.1

重新创建结构列并过滤字段C

result = df.withColumn(
    "newColumn",
    F.to_json(
        F.struct(*[
            F.col(f"parsed_column.{c}").alias(c)
            for c in df.selectExpr("parsed_column.*").columns if c != 'C'
        ])
    )
).drop("parsed_column")

今日免费次数已满, 请开通会员/明日再来
result = df.withColumn(
    "newColumn",
    F.to_json(
        F.map_filter(
            F.from_json("column", "map<string,string>"),
            lambda k, v: k != "C"
        )
    )

0

嗯,这并不像看起来那么简单。首先,你的方法并不适用于Spark,除非你处理的数据很少(因此,你不需要Spark),最好使用纯Python就像你尝试过的那样。使用collect()将所有数据获取到驱动程序上,这对于大量数据是行不通的。

针对此问题的分布式方法如下:

  • 在您的JSON数据的一部分上推断模式(除非您想手动执行此操作-这很繁琐)
  • 使用此模式转换您的数据框以访问命名属性
  • 根据需要选择属性并返回JSON

我尽可能地进行了分解:

from pyspark.sql.types import IntegerType, StructType, StringType, StructField
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import json

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# Create input data
data = [json.dumps({"A": "2022-01-26T14:21:32.214+0000", "B": 69, "C": {"CA": 42, "CB": "Hello"}, "D": "XD"})]
df = spark.createDataFrame(data, "string").toDF("colA")
df.show()

+-----------------------------------------------------------------------------------------+
|colA                                                                                     |
+-----------------------------------------------------------------------------------------+
|{"A": "2022-01-26T14:21:32.214+0000", "B": 69, "C": {"CA": 42, "CB": "Hello"}, "D": "XD"}|
+-----------------------------------------------------------------------------------------+


# Infer schema - infering on first 10 rows
s = df.select(F.col("colA").alias("s")).rdd.map(lambda x: x.s).take(10)
schema = spark.read.json(sc.parallelize(s)).schema
print(schema)

# StructType(List(StructField(A,StringType,true),StructField(B,LongType,true),StructField(C,StructType(List(StructField(CA,LongType,true),StructField(CB,StringType,true))),true),StructField(D,StringType,true)))


# read JSON string with schema
new_df = df.withColumn("colB", F.from_json("colA", schema))
new_df.show(truncate=False)

+-----------------------------------------------------------------------------------------+---------------------------------------------------+
|colA                                                                                     |colB                                               |
+-----------------------------------------------------------------------------------------+---------------------------------------------------+
|{"A": "2022-01-26T14:21:32.214+0000", "B": 69, "C": {"CA": 42, "CB": "Hello"}, "D": "XD"}|{2022-01-26T14:21:32.214+0000, 69, {42, Hello}, XD}|
+-----------------------------------------------------------------------------------------+---------------------------------------------------+


# Finally ...
new_df.select(F.to_json(F.struct("colB.A", "colB.B", "colB.D")).alias("colC")).show(truncate=False)

+----------------------------------------------------+
|colC                                                |
+----------------------------------------------------+
|{"A":"2022-01-26T14:21:32.214+0000","B":69,"D":"XD"}|
+----------------------------------------------------+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接