使用PySpark在嵌套结构中删除Spark DataFrame中的列(详见正文)

3

我知道之前在这里提过类似的问题 ,但那是关于行过滤的。这次我想删除列而不是行。我试图实现Higher Order Functions(高阶函数)如 FILTER和其他一些时间,但无法让它们起作用。我认为我需要一个 SELECT Higher Order Function,但它似乎不存在。感谢您的帮助!

我正在使用pyspark并且有一个Dataframe对象df,这是df.printSchema()输出的内容。

root
 |-- M_MRN: string (nullable = true)
 |-- measurements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Observation_ID: string (nullable = true)
 |    |    |-- Observation_Name: string (nullable = true)
 |    |    |-- Observation_Result: string (nullable = true)

我希望只保留'measurements'中的'Observation_ID'或'Observation_Result'列。 当我运行 df.select('measurements').take(2) 时,目前的输出如下:

[Row(measurements=[Row(Observation_ID='5', Observation_Name='ABC', Observation_Result='108/72'),
                   Row(Observation_ID='11', Observation_Name='ABC', Observation_Result='70'),
                   Row(Observation_ID='10', Observation_Name='ABC', Observation_Result='73.029'),
                   Row(Observation_ID='14', Observation_Name='XYZ', Observation_Result='23.1')]),
 Row(measurements=[Row(Observation_ID='2', Observation_Name='ZZZ', Observation_Result='3/4'),
                   Row(Observation_ID='5', Observation_Name='ABC', Observation_Result='7')])]

我希望在执行以上筛选并运行df.select('measurements').take(2)之后,能得到以下结果:

[Row(measurements=[Row(Observation_ID='5', Observation_Result='108/72'),
                   Row(Observation_ID='11', Observation_Result='70'),
                   Row(Observation_ID='10', Observation_Result='73.029'),
                   Row(Observation_ID='14', Observation_Result='23.1')]),
 Row(measurements=[Row(Observation_ID='2', Observation_Result='3/4'),
                   Row(Observation_ID='5', Observation_Result='7')])]

有没有在pyspark中实现这个的方法?感谢您的帮助!

2个回答

3
你可以使用 高阶函数 transform选择 你想要的字段并将它们放入一个 结构体 中。
from pyspark.sql import functions as F
df.withColumn("measurements",F.expr("""transform(measurements\
,x-> struct(x.Observation_ID as Observation_ID,\
             x.Observation_Result as Observation_Result))""")).printSchema()

#root
 #|-- measurements: array (nullable = true)
 #|    |-- element: struct (containsNull = false)
 #|    |    |-- Observation_ID: string (nullable = true)
 #|    |    |-- Observation_Result: string (nullable = true)

3
如果我需要从数百列中删除一到两列,而不是选择它们,有没有什么方法可以做到这一点? - martian_rover

0

对于正在寻找适用于旧版pyspark的答案的任何人,这里有一个使用udfs的答案:

import pyspark.sql.functions as f
from pyspark.sql.types import ArrayType, LongType, StringType, StructField, StructType

_measurement_type = ArrayType(StructType([
    StructField('Observation_ID', StringType(), True),
    StructField('Observation_Result', StringType(), True)
]))

@f.udf(returnType=_measurement_type)
def higher_order_select(measurements):
    return [(m.Observation_ID, m.Observation_Result) for m in measurements]

df.select(higher_order_select('measurements').alias('measurements')).printSchema()

打印

root
 |-- measurements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Observation_ID: string (nullable = true)
 |    |    |-- Observation_Result: string (nullable = true)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接