使用PySpark删除DataFrame中的嵌套列

11

我正在尝试使用PySpark从Spark dataframe中删除一些嵌套的结构体列。

我在Stack Overflow上找到了Scala的代码,似乎正好符合我的要求,但我不熟悉Scala,不知道如何将其转换成Python。
https://dev59.com/slwY5IYBdhLWcg3wWWm0#39943812

6个回答

6

无法工作: 文件“C:\ spark-3.1.2-bin-hadoop2.7 \ python \ pyspark \ sql \ dataframe.py”,第1643行,__getattr__中, 引发AttributeError(AttributeError:“DataFrame”对象没有属性“dropFields” - deathrace
1
它是Column.dropFields,而不是DataFrame.dropFields @deathrace - Pierre

4

示例:pyspark

def drop_col(df, struct_nm, delete_struct_child_col_nm):
    fields_to_keep = filter(lambda x:  x != delete_struct_child_col_nm, df.select("{}.*".format(struct_nm)).columns)
    fields_to_keep = list(map(lambda x:  "{}.{}".format(struct_nm, x), fields_to_keep))
    return df.withColumn(struct_nm, struct(fields_to_keep))

请问您能否解释一下这些参数的含义? - Ébe Isaac
这对我来说似乎有效。 df = 数据框架 col_nm = 父列名称 delete_col_nm = 目标子列名称,用于删除 - NegatioN

3

我发现使用pyspark的一种方法是先将嵌套列转换为json,然后使用一个带有过滤掉不需要的列的新嵌套模式解析转换后的json。

假设我有以下模式,并且想要从数据框中删除 dej (a.b.d, a.e, a.h.j):

root
 |-- a: struct (nullable = true)
 |    |-- b: struct (nullable = true)
 |    |    |-- c: long (nullable = true)
 |    |    |-- d: string (nullable = true)
 |    |-- e: struct (nullable = true)
 |    |    |-- f: long (nullable = true)
 |    |    |-- g: string (nullable = true)
 |    |-- h: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- i: string (nullable = true)
 |    |    |    |-- j: string (nullable = true)
 |-- k: string (nullable = true)

我使用了以下方法:
  1. Create new schema for a by excluding d, e and j. A quick way to do this is by manually select the fields that you want from df.select("a").schema and create a new schema from the selected fields using StructType. Or, you can do this programmatically by traversing the schema tree and exclude the unwanted fields, something like:

    def exclude_nested_field(schema, unwanted_fields, parent=""):
        new_schema = []
    
        for field in schema:
            full_field_name = field.name
            if parent:
                full_field_name = parent + "." + full_field_name
    
            if full_field_name not in unwanted_fields:
                if isinstance(field.dataType, StructType):
                    inner_schema = exclude_nested_field(field.dataType, unwanted_fields, full_field_name)
                    new_schema.append(StructField(field.name, inner_schema))
                elif isinstance(field.dataType, ArrayType):
                    new_schema.append(StructField(field.name, ArrayType(field.dataType.elementType)))
                else:
                    new_schema.append(StructField(field.name, field.dataType))
    
        return StructType(new_schema)
    
    new_schema = exclude_nested_field(df.schema["a"].dataType, ["b.d", "e", "h.j"])
    
  2. Convert a column to json: .withColumn("json", F.to_json("a")).drop("a")

  3. Parse the json-converted a column from step 2 with the new schema found in step 1: .withColumn("a", F.from_json("json", new_schema)).drop("json")


我正在尝试使用这个函数,但我的结构体有一个array<struct<...>>,并且出现了“TypeError: 'ArrayType' object is not iterable”的错误。有什么想法如何解决这个问题吗? - Alex Fragotsis
我们现在在Spark 3.1.1中有了更好地处理嵌套字段并能够编辑或删除它们而不影响其他字段的功能。https://germanschiavon.medium.com/spark-3-nested-fields-not-so-nested-anymore-9b8d34b00b95 - Pierre

2
有以下数据框,目的是删除 dej
from pyspark.sql import functions as F
df = spark.createDataFrame([], "a struct<b:struct<c:bigint,d:string>,e:struct<f:bigint,g:string>,h:array<struct<i:string,j:string>>>, k string")
df.printSchema()
# root
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- c: long (nullable = true)
#  |    |    |-- d: string (nullable = true)      # <<--- to be dropped
#  |    |-- e: struct (nullable = true)           # <<--- to be dropped
#  |    |    |-- f: long (nullable = true)
#  |    |    |-- g: string (nullable = true)
#  |    |-- h: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- i: string (nullable = true)
#  |    |    |    |-- j: string (nullable = true)  # <<--- to be dropped
#  |-- k: string (nullable = true)

e 是最简单的:

df = df.withColumn("a", F.col("a").dropFields("e"))

df.printSchema()
# root
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- c: long (nullable = true)
#  |    |    |-- d: string (nullable = true)
#  |    |-- h: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- i: string (nullable = true)
#  |    |    |    |-- j: string (nullable = true)
#  |-- k: string (nullable = true)

为了删除 d,我们必须进入 b
df = df.withColumn("a", F.col("a").withField("b", F.col("a.b").dropFields("d")))

df.printSchema()
# root
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- c: long (nullable = true)
#  |    |-- h: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- i: string (nullable = true)
#  |    |    |    |-- j: string (nullable = true)
#  |-- k: string (nullable = true)

j在数组中,因此必须使用transform。它会“循环”遍历每个数组元素(在本例中,元素是一个结构体)并进行转换(删除一个字段)。

df = df.withColumn("a", F.col("a").withField(
    "h",
    F.transform(
        F.col("a.h"),
        lambda x: x.dropFields("j")
    )
))

df.printSchema()
# root
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- c: long (nullable = true)
#  |    |-- h: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- i: string (nullable = true)
#  |-- k: string (nullable = true)

1

虽然我没有PySpark的解决方案,但是也许将其转换为Python会更容易。考虑一个带有模式的数据框df:

root
 |-- employee: struct (nullable = false)
 |    |-- name: string (nullable = false)
 |    |-- age: integer (nullable = false)

然后,如果您想删除例如name,可以执行以下操作:

val fieldsToKeep = df.select($"employee.*").columns
.filter(_!="name") // the nested column you want to drop
.map(n => "employee."+n)

// overwite column with subset of fields
df
.withColumn("employee",struct(fieldsToKeep.head,fieldsToKeep.tail:_*)) 

0

这是Raphael Scala答案的Pyspark版本。

它在特定深度运行,丢弃该深度以上的所有内容,并过滤下面的行。

def remove_columns(df,root):
  from pyspark.sql.functions import col
  cols = df.select(root).columns
  fields_filter = filter(lambda x: x[0]!= "$", cols) # use your own lambda here. 
  fieldsToKeep = list(map(lambda x: root[:-1] + x, fields_filter)) 
  return df.select(fieldsToKeep)

df = remove_columns(raw_df, root="level1.level2.*")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接