如何在PySpark数据框中向嵌套结构中添加列?

26
我有一个带有类似于架构的数据框。
root
 |-- state: struct (nullable = true)
 |    |-- fld: integer (nullable = true)

我想在state结构体中添加列,也就是创建一个具有以下模式的数据框:

root
 |-- state: struct (nullable = true)
 |    |-- fld: integer (nullable = true)
 |    |-- a: integer (nullable = true)

我尝试过

df.withColumn('state.a', val).printSchema()
# root
#  |-- state: struct (nullable = true)
#  |    |-- fld: integer (nullable = true)
#  |-- state.a: integer (nullable = true)

1
您可以使用所需的模式和UDF创建新列,然后删除旧列。据我所知,您无法更改结构列的模式。请参见此问题 - pauli
7个回答

33

这里有一种方法可以在不使用 udf 的情况下完成:

# create example dataframe
import pyspark.sql.functions as f
data = [
    ({'fld': 0},)
]

schema = StructType(
    [
        StructField('state',
            StructType(
                [StructField('fld', IntegerType())]
            )
        )
    ]
)

df = sqlCtx.createDataFrame(data, schema)
df.printSchema()
#root
# |-- state: struct (nullable = true)
# |    |-- fld: integer (nullable = true)

现在使用 withColumn(),并使用 lit()alias() 添加新字段。

val = 1
df_new = df.withColumn(
    'state', 
    f.struct(*[f.col('state')['fld'].alias('fld'), f.lit(val).alias('a')])
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# |    |-- fld: integer (nullable = true)
# |    |-- a: integer (nullable = false)

如果你的嵌套结构中有很多字段,可以使用列表推导式,并使用df.schema["state"].dataType.names来获取字段名称。例如:

如果嵌套结构中包含大量字段,您可以使用列表推导式,使用df.schema["state"].dataType.names获取字段名称。例如:

val = 1
s_fields = df.schema["state"].dataType.names # ['fld']
df_new = df.withColumn(
    'state', 
    f.struct(*([f.col('state')[c].alias(c) for c in s_fields] + [f.lit(val).alias('a')]))
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# |    |-- fld: integer (nullable = true)
# |    |-- a: integer (nullable = false)

参考资料

  • 我从这个答案中找到了一种不需要手动命名的方法来获取结构体中的字段名称。

我明白了,使用withColumnstruct替换为新的结构体,并复制旧字段。这很有效,谢谢!我想知道是否有一种方法可以向结构体添加字段,而无需命名所有现有子字段? - MrCartoonology
@MrCartoonology,我找到了更干净的方法来获取字段名称。请查看更新。 - pault

21

使用以下转换:

import pyspark.sql.functions as f

df = df.withColumn(
    "state",
    f.struct(
        f.col("state.*"),
        f.lit(123).alias("a")
    )
)

分析异常:只能对结构数据类型进行星号展开。属性:ArrayBuffer(state) - Blue Clouds
@BlueClouds,你的数据框架模式是什么(具体来说,在这种情况下,“state”列的类型是什么)? - malthe
状态是一个结构体 - Blue Clouds
你确定吗?因为你收到了一个错误消息,表明它不是一个结构数据类型。如果你打印模式,会发生什么呢? - malthe

5

3

Spark 3.1+

F.col('state').withField('a', F.lit(1))

例子:

from pyspark.sql import functions as F
df = spark.createDataFrame([((1,),)], 'state:struct<fld:int>')
df.printSchema()
# root
#  |-- state: struct (nullable = true)
#  |    |-- fld: integer (nullable = true)

df = df.withColumn('state', F.col('state').withField('a', F.lit(1)))
df.printSchema()
# root
#  |-- state: struct (nullable = true)
#  |    |-- fld: integer (nullable = true)
#  |    |-- a: integer (nullable = false)

类型错误:'Column'对象不可调用 - Blue Clouds
出现TypeError: 'Column' object is not callable的一个原因是使用的Spark版本低于3.1。 - undefined

1
你可以使用 struct 函数。
import pyspark.sql.functions as f

df = df.withColumn(
    "state",
    f.struct(
        f.col("state.fld").alias("fld"),
        f.lit(1).alias("a")
    )
)

1
这是一种不使用UDF的方法。
初始化示例数据帧:
nested_df1 = (spark.read.json(sc.parallelize(["""[
        { "state": {"fld": 1} },
        { "state": {"fld": 2}}
    ]"""])))

nested_df1.printSchema()

root
 |-- state: struct (nullable = true)
 |    |-- fld: long (nullable = true)

Spark的.read.json默认将所有整数导入为long。 如果state.fld必须是int,则需要进行转换。

from pyspark.sql import functions as F

nested_df1 = (nested_df1
    .select( F.struct(F.col("state.fld").alias("fld").cast('int')).alias("state") ))

nested_df1.printSchema()

root
 |-- state: struct (nullable = false)
 |    |-- col1: integer (nullable = true)

nested_df1.show()

+-----+
|state|
+-----+
|  [1]|
|  [2]|
+-----+

最后

使用.select方法,通过"parent.child"符号来获取现有结构中所需的嵌套列,创建新列,然后使用struct将旧列和新列重新包装在一起。

val_a = 3

nested_df2 = (nested_df
    .select( 
        F.struct(
            F.col("state.fld"), 
            F.lit(val_a).alias("a")
        ).alias("state")
    )
)


nested_df2.printSchema()

root
 |-- state: struct (nullable = false)
 |    |-- fld: integer (nullable = true)
 |    |-- a: integer (nullable = false)

nested_df2.show()

+------+
| state|
+------+
|[1, 3]|
|[2, 3]|
+------+

如有需要,请使用"parent.*"来展开。
nested_df2.select("state.*").printSchema()

root
 |-- fld: integer (nullable = true)
 |-- a: integer (nullable = false)

nested_df2.select("state.*").show()

+---+---+
|fld|  a|
+---+---+
|  1|  3|
|  2|  3|
+---+---+

-2
from pyspark.sql.functions import *
from pyspark.sql.types import *
def add_field_in_dataframe(nfield, df, dt): 
    fields = nfield.split(".")
    print fields
    n = len(fields)
    addField = fields[0]  
    if n == 1:
        return df.withColumn(addField, lit(None).cast(dt))

    nestedField = ".".join(fields[:-1])
    sfields = df.select(nestedField).schema[fields[-2]].dataType.names
    print sfields
    ac = col(nestedField)
    if n == 2:
        nc = struct(*( [ac[c].alias(c) for c in sfields] + [lit(None).cast(dt).alias(fields[-1])]))
    else:
        nc = struct(*( [ac[c].alias(c) for c in sfields] + [lit(None).cast(dt).alias(fields[-1])])).alias(fields[-2])
    print nc
    n = n - 1

    while n > 1: 
        print "n: ",n
        fields = fields[:-1]
        print "fields: ", fields
        nestedField = ".".join(fields[:-1])
        print "nestedField: ", nestedField
        sfields = df.select(nestedField).schema[fields[-2]].dataType.names
        print fields[-1]
        print "sfields: ", sfields
        sfields = [s for s in sfields if s != fields[-1]]
        print "sfields: ", sfields
        ac = col(".".join(fields[:-1]))
        if n > 2: 
            print fields[-2]
            nc = struct(*( [ac[c].alias(c) for c in sfields] + [nc])).alias(fields[-2])
        else:
            nc = struct(*( [ac[c].alias(c) for c in sfields] + [nc]))
        n = n - 1
    return df.withColumn(addField, nc)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接