我正在尝试使用PySpark从Spark dataframe中删除一些嵌套的结构体列。
我在Stack Overflow上找到了Scala的代码,似乎正好符合我的要求,但我不熟悉Scala,不知道如何将其转换成Python。
https://dev59.com/slwY5IYBdhLWcg3wWWm0#39943812
我正在尝试使用PySpark从Spark dataframe中删除一些嵌套的结构体列。
我在Stack Overflow上找到了Scala的代码,似乎正好符合我的要求,但我不熟悉Scala,不知道如何将其转换成Python。
https://dev59.com/slwY5IYBdhLWcg3wWWm0#39943812
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.Column.dropFields.html
示例:pyspark
def drop_col(df, struct_nm, delete_struct_child_col_nm):
fields_to_keep = filter(lambda x: x != delete_struct_child_col_nm, df.select("{}.*".format(struct_nm)).columns)
fields_to_keep = list(map(lambda x: "{}.{}".format(struct_nm, x), fields_to_keep))
return df.withColumn(struct_nm, struct(fields_to_keep))
我发现使用pyspark的一种方法是先将嵌套列转换为json,然后使用一个带有过滤掉不需要的列的新嵌套模式解析转换后的json。
假设我有以下模式,并且想要从数据框中删除 d
,e
和 j
(a.b.d
, a.e
, a.h.j
):
root
|-- a: struct (nullable = true)
| |-- b: struct (nullable = true)
| | |-- c: long (nullable = true)
| | |-- d: string (nullable = true)
| |-- e: struct (nullable = true)
| | |-- f: long (nullable = true)
| | |-- g: string (nullable = true)
| |-- h: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- i: string (nullable = true)
| | | |-- j: string (nullable = true)
|-- k: string (nullable = true)
Create new schema for a
by excluding d
, e
and j
. A quick way to do this is by manually select the fields that you want from df.select("a").schema
and create a new schema from the selected fields using StructType
. Or, you can do this programmatically by traversing the schema tree and exclude the unwanted fields, something like:
def exclude_nested_field(schema, unwanted_fields, parent=""):
new_schema = []
for field in schema:
full_field_name = field.name
if parent:
full_field_name = parent + "." + full_field_name
if full_field_name not in unwanted_fields:
if isinstance(field.dataType, StructType):
inner_schema = exclude_nested_field(field.dataType, unwanted_fields, full_field_name)
new_schema.append(StructField(field.name, inner_schema))
elif isinstance(field.dataType, ArrayType):
new_schema.append(StructField(field.name, ArrayType(field.dataType.elementType)))
else:
new_schema.append(StructField(field.name, field.dataType))
return StructType(new_schema)
new_schema = exclude_nested_field(df.schema["a"].dataType, ["b.d", "e", "h.j"])
Convert a
column to json: .withColumn("json", F.to_json("a")).drop("a")
Parse the json-converted a
column from step 2 with the new schema found in step 1: .withColumn("a", F.from_json("json", new_schema)).drop("json")
d
、e
和 j
。from pyspark.sql import functions as F
df = spark.createDataFrame([], "a struct<b:struct<c:bigint,d:string>,e:struct<f:bigint,g:string>,h:array<struct<i:string,j:string>>>, k string")
df.printSchema()
# root
# |-- a: struct (nullable = true)
# | |-- b: struct (nullable = true)
# | | |-- c: long (nullable = true)
# | | |-- d: string (nullable = true) # <<--- to be dropped
# | |-- e: struct (nullable = true) # <<--- to be dropped
# | | |-- f: long (nullable = true)
# | | |-- g: string (nullable = true)
# | |-- h: array (nullable = true)
# | | |-- element: struct (containsNull = true)
# | | | |-- i: string (nullable = true)
# | | | |-- j: string (nullable = true) # <<--- to be dropped
# |-- k: string (nullable = true)
e
是最简单的:
df = df.withColumn("a", F.col("a").dropFields("e"))
df.printSchema()
# root
# |-- a: struct (nullable = true)
# | |-- b: struct (nullable = true)
# | | |-- c: long (nullable = true)
# | | |-- d: string (nullable = true)
# | |-- h: array (nullable = true)
# | | |-- element: struct (containsNull = true)
# | | | |-- i: string (nullable = true)
# | | | |-- j: string (nullable = true)
# |-- k: string (nullable = true)
d
,我们必须进入 b
:df = df.withColumn("a", F.col("a").withField("b", F.col("a.b").dropFields("d")))
df.printSchema()
# root
# |-- a: struct (nullable = true)
# | |-- b: struct (nullable = true)
# | | |-- c: long (nullable = true)
# | |-- h: array (nullable = true)
# | | |-- element: struct (containsNull = true)
# | | | |-- i: string (nullable = true)
# | | | |-- j: string (nullable = true)
# |-- k: string (nullable = true)
j
在数组中,因此必须使用transform
。它会“循环”遍历每个数组元素(在本例中,元素是一个结构体)并进行转换(删除一个字段)。
df = df.withColumn("a", F.col("a").withField(
"h",
F.transform(
F.col("a.h"),
lambda x: x.dropFields("j")
)
))
df.printSchema()
# root
# |-- a: struct (nullable = true)
# | |-- b: struct (nullable = true)
# | | |-- c: long (nullable = true)
# | |-- h: array (nullable = true)
# | | |-- element: struct (containsNull = true)
# | | | |-- i: string (nullable = true)
# |-- k: string (nullable = true)
虽然我没有PySpark的解决方案,但是也许将其转换为Python会更容易。考虑一个带有模式的数据框df
:
root
|-- employee: struct (nullable = false)
| |-- name: string (nullable = false)
| |-- age: integer (nullable = false)
name
,可以执行以下操作:
val fieldsToKeep = df.select($"employee.*").columns
.filter(_!="name") // the nested column you want to drop
.map(n => "employee."+n)
// overwite column with subset of fields
df
.withColumn("employee",struct(fieldsToKeep.head,fieldsToKeep.tail:_*))
这是Raphael Scala答案的Pyspark版本。
它在特定深度运行,丢弃该深度以上的所有内容,并过滤下面的行。
def remove_columns(df,root):
from pyspark.sql.functions import col
cols = df.select(root).columns
fields_filter = filter(lambda x: x[0]!= "$", cols) # use your own lambda here.
fieldsToKeep = list(map(lambda x: root[:-1] + x, fields_filter))
return df.select(fieldsToKeep)
df = remove_columns(raw_df, root="level1.level2.*")