Spark - 如何向结构体数组中添加元素

7

有了这个模式:

root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)

我们如何像这样添加一个新字段?
root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- New_field: integer (nullable = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)

我已经用一个简单的结构体实现了这个功能(详见本帖底部),但我无法使用结构体数组来完成它。

以下是测试代码:

val schema = new StructType()
    .add("Elems", ArrayType(new StructType()
        .add("Elem", IntegerType)
        .add("Desc", StringType)
    ))

val dataDS = Seq("""
{
  "Elems": [ {"Elem":1, "Desc": "d1"}, {"Elem":2, "Desc": "d2"}, {"Elem":3, "Desc": "d3"} ]
}
""").toDS()

val df = spark.read.schema(schema).json(dataDS.rdd)

df.show(false)
+---------------------------+
|Elems                      |
+---------------------------+
|[[1, d1], [2, d2], [3, d3]]|
+---------------------------+

一旦我们有了DF,我建议的最佳方法是为每个元素创建一个结构数组:

val mod_df = df.withColumn("modif_elems", 
     struct(
         array(lit("")).as("New_field"),
         col("Elems.Elem"),
         col("Elems.Desc")
                            ))

mod_df.show(false)
+---------------------------+-----------------------------+
|Elems                      |modif_elems                  |
+---------------------------+-----------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[], [1, 2, 3], [d1, d2, d3]]|
+---------------------------+-----------------------------+


mod_df.printSchema
root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)
 |-- modif_elems: struct (nullable = false)
 |    |-- New_field: array (nullable = false)
 |    |    |-- element: string (containsNull = false)
 |    |-- Elem: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |    |-- Desc: array (nullable = true)
 |    |    |-- element: string (containsNull = true)

我们没有失去任何数据,但这并不完全是我想要的。

更新: 在PD1中找到了解决方法。


额外内容: 修改一个结构体(不在数组中)

代码几乎相同,但现在我们没有一个结构体数组,所以修改结构体会更容易:

val schema = new StructType()
    .add("Elems", new StructType()
        .add("Elem", IntegerType)
        .add("Desc", StringType)
    )


val dataDS = Seq("""
{
  "Elems": {"Elem":1, "Desc": "d1"}
}
""").toDS()    


val df = spark.read.schema(schema).json(dataDS.rdd)
df.show(false)
+-------+
|Elems  |
+-------+
|[1, d1]|
+-------+

df.printSchema
root
 |-- Elems: struct (nullable = true)
 |    |-- Elem: integer (nullable = true)
 |    |-- Desc: string (nullable = true)

在这种情况下,为了添加该字段,我们需要创建另一个结构体:
val mod_df = df
    .withColumn("modif_elems", 
                struct(
                    lit("").alias("New_field"),
                    col("Elems.Elem"),
                    col("Elems.Desc")
                    )
               )

mod_df.show
+-------+-----------+
|  Elems|modif_elems|
+-------+-----------+
|[1, d1]|  [, 1, d1]|
+-------+-----------+

mod_df.printSchema
root
 |-- Elems: struct (nullable = true)
 |    |-- Elem: integer (nullable = true)
 |    |-- Desc: string (nullable = true)
 |-- modif_elems: struct (nullable = false)
 |    |-- New_field: string (nullable = false)
 |    |-- Elem: integer (nullable = true)
 |    |-- Desc: string (nullable = true)


PD1:

好的,我使用了Spark SQL函数arrays_zip(2.4.0版本中新增),它基本上是我想要的,但我不知道如何更改元素名称(asalias在此处无效):

val mod_df = df.withColumn("modif_elems", 
        arrays_zip(
            array(lit("")).as("New_field"),
            col("Elems.Elem").as("Elem"),
            col("Elems.Desc").alias("Desc")
                    )
        )

mod_df.show(false)
+---------------------------+---------------------------------+
|Elems                      |modif_elems                      |
+---------------------------+---------------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|
+---------------------------+---------------------------------+

mod_df.printSchema
root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)
 |-- modif_elems: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- 0: string (nullable = true)
 |    |    |-- 1: integer (nullable = true)
 |    |    |-- 2: string (nullable = true)

结构体 modif_elems 应该包含三个元素,名称分别为 New_fieldElemDesc,而不是 012

2个回答

9

Spark 3.1+

withField 可以与transform一起使用。


(提示:该功能与 Spark 3.1+ 版本相关,可用于 IT 技术。)
  • Scala

    Input:

    val df = spark.createDataFrame(Seq((1, "2")))
        .select(
            array(struct(
                col("_1").as("Elem"),
                col("_2").as("Desc")
            )).as("Elems")
        )
    df.printSchema()
    // root
    //  |-- Elems: array (nullable = true)
    //  |    |-- element: struct (containsNull = true)
    //  |    |    |-- Elem: integer (nullable = true)
    //  |    |    |-- Desc: string (nullable = true)
    

    Script

    val df2 = df.withColumn(
        "Elems",
        transform(
            $"Elems",
            x => x.withField("New_field", lit(3))
        )
    )
    df2.printSchema()
    // root
    //  |-- Elems: array (nullable = false)
    //  |    |-- element: struct (containsNull = false)
    //  |    |    |-- Elem: long (nullable = true)
    //  |    |    |-- Desc: string (nullable = true)
    //  |    |    |-- New_field: integer (nullable = false)
    
  • PySpark

    Input:

    from pyspark.sql import functions as F
    df = spark.createDataFrame([(1, "2",)]) \
        .select(
            F.array(F.struct(
                F.col("_1").alias("Elem"),
                F.col("_2").alias("Desc")
            )).alias("Elems")
        )
    df.printSchema()
    # root
    #  |-- Elems: array (nullable = true)
    #  |    |-- element: struct (containsNull = true)
    #  |    |    |-- Elem: integer (nullable = true)
    #  |    |    |-- Desc: string (nullable = true)
    

    Script:

    df = df.withColumn(
        "Elems",
        F.transform(
            F.col("Elems"),
            lambda x: x.withField("New_field", F.lit(3))
        )
    )
    df.printSchema()
    # root
    #  |-- Elems: array (nullable = false)
    #  |    |-- element: struct (containsNull = false)
    #  |    |    |-- Elem: long (nullable = true)
    #  |    |    |-- Desc: string (nullable = true)
    #  |    |    |-- New_field: integer (nullable = false)
    

3

解决方案在这里。我们需要使用arrays_zip,然后将所得到的列重命名为结构体的重命名模式 (elem_struct_recomposed):


val elem_struct_recomposed = new StructType()
  .add("New_field", StringType)
  .add("ElemRenamed", IntegerType)
  .add("DescRenamed", StringType)


val mod_df = df
    .withColumn("modif_elems_NOT_renamed", 
        arrays_zip(
            array(lit("")).as("New_field"),
            col("Elems.Elem").as("ElemRenamed"),
            col("Elems.Desc").alias("DescRenamed")
                    ))
    .withColumn("modif_elems_renamed", 
               $"modif_elems_NOT_renamed".cast(ArrayType(elem_struct_recomposed)))


mod_df.show(false)
mod_df.printSchema

+---------------------------+---------------------------------+---------------------------------+
|Elems                      |modif_elems_NOT_renamed          |modif_elems_renamed              |
+---------------------------+---------------------------------+---------------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|
+---------------------------+---------------------------------+---------------------------------+

root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)
 |-- modif_elems_NOT_renamed: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- 0: string (nullable = true)
 |    |    |-- 1: integer (nullable = true)
 |    |    |-- 2: string (nullable = true)
 |-- modif_elems_renamed: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- New_field: string (nullable = true)
 |    |    |-- ElemRenamed: integer (nullable = true)
 |    |    |-- DescRenamed: string (nullable = true)


你好 @rvilla,我们如何在Spark 2.3上使用arrays_zip?任何帮助都将不胜感激。 - Anil Kumar
抱歉,我没有尝试过,但你可以按照在Github源代码中看到的实现这些函数: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala请注意,ArraysZip是来自org/apache/spark/sql/catalyst/expressions/collectionOperations.scala的一个case类。您可以使用您喜欢的代码编辑器并打开arrays_zip函数源代码来查看它。 - rvilla
2
@rvilla,关于Spark 2.3,请参见我的回答:https://stackoverflow.com/questions/61919972/how-to-handle-missing-nested-fields-in-spark/61924562#61924562 - Noam Shaish
如果“Elems”列有多个元素,则此方法会在“New_Field”中产生空值。 - puligun
什么是elem_struct_recomposed - sohil
抱歉,sohil,我忘记指定重命名后的结构模式。已修复(我在本地使用Spark 3.1.2测试过,没有问题)。谢谢! - rvilla

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接