使用Pyspark按列分组元素并创建字典

4

我有一个从csv文件读取的Spark DataFrame,读取方式如下:

df = ss.read \
     .format("csv") \
     .option("delimiter", ";") \
     .option("header", "false") \
     .option("inferSchema", "true") \
     .option("escape", "\"") \
     .option("multiline", "true") \
     .option("wholeFile", "true") \
     .load(file_path)

数据框的格式如下:
|cod_cli|article_name|rank|
|123    |art_1       |1   |
|123    |art_2       |2   |
|123    |art_3       |3   |
|456    |art_4       |1   |
|456    |art_5       |2   |
|456    |art_6       |3   |

我希望按照 cod_cli 列对元素进行分组,并创建多个列,每个列对应分组集合中的一个产品,以字典键值对的形式表示,其中键为列名,值为与该列名相关联的数值,形式如下:

|cod_cli|Product 1                  |Product 2                  |Product 3                  |
|123    |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456    |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|

字典的值可以是字符串(更好)或地图。

我尝试了这种方式:

df = df \
     .groupBy(F.col("cod_cli")) \
     .agg(F.collect_list(F.array("cod_art","rank")))

但是这样做,我会创建一个具有所有分组元素的数组列的列。

请问是否有人可以帮助我?

谢谢

更新

提出的解决方案如下:

df = df.withColumn(
            "Product",
            F.to_json(
                F.struct(F.col("cod_art"), F.col("rank"))
            )
        )

我这样创建了一个名为“Product”的列,并使用所需的json字符串示例创建了它,例如{cod_art: art_1, rank: 1}

然后:

df = df \
     .groupBy(F.col("cod_cli")) \
     .pivot("rank") \
     .agg(F.first("Product"))

通过这种方式,我可以为每个产品创建一列,按照cod_cli属性进行分组,并处理有多于3个产品的列的情况:

|cod_cli|1                          |2                          |3               
|123    |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456    |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|
2个回答

3

您可以使用collect_liststruct,而无需进行昂贵的操作,然后使用create_mapto_json将其转换为JSON格式。

from pyspark.sql import functions as F

df\
  .groupBy("cod_cli").agg(F.collect_list(F.struct("article_name","rank"))\
                          .alias("arr"))\
  .select("cod_cli", *(F.to_json(F.create_map(F.lit("cod_art"),(F.col("arr.article_name")[x]),F.lit("rank"),(F.col("arr.rank")[x])))\
                       .alias("Product{}".format(x+1)) for x in range(3)))\
  .show(truncate=False)

#+-------+------------------------------+------------------------------+------------------------------+
#|cod_cli|Product1                      |Product2                      |Product3                      |
#+-------+------------------------------+------------------------------+------------------------------+
#|123    |{"cod_art":"art_1","rank":"1"}|{"cod_art":"art_2","rank":"2"}|{"cod_art":"art_3","rank":"3"}|
#|456    |{"cod_art":"art_4","rank":"1"}|{"cod_art":"art_5","rank":"2"}|{"cod_art":"art_6","rank":"3"}|
#+-------+------------------------------+------------------------------+------------------------------+

2
也许这会有用——

加载提供的数据

 val data =
      """
        |cod_cli|article_name|rank
        |123    |art_1       |1
        |123    |art_2       |2
        |123    |art_3       |3
        |456    |art_4       |1
        |456    |art_5       |2
        |456    |art_6       |3
      """.stripMargin
    val stringDS = data.split(System.lineSeparator())
            .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS)

    df.show(false)
    df.printSchema()

    /**
      * +-------+------------+----+
      * |cod_cli|article_name|rank|
      * +-------+------------+----+
      * |123    |art_1       |1   |
      * |123    |art_2       |2   |
      * |123    |art_3       |3   |
      * |456    |art_4       |1   |
      * |456    |art_5       |2   |
      * |456    |art_6       |3   |
      * +-------+------------+----+
      *
      * root
      * |-- cod_cli: integer (nullable = true)
      * |-- article_name: string (nullable = true)
      * |-- rank: integer (nullable = true)
      */

使用pivotfirst指定创建列(应在pyspark中实现,最小更改都是pyspark.sql.functions

    df.groupBy("cod_cli")
      .pivot("rank")
      .agg(first("article_name"))
      .select($"cod_cli", $"1".as("Product 1"), $"2".as("Product 2"), $"3".as("Product 3"))
      .withColumn("Product 1", to_json(expr("named_struct('cod_art', `Product 1`, 'rank', '1')")))
      .withColumn("Product 2", to_json(expr("named_struct('cod_art', `Product 2`, 'rank', '2')")))
      .withColumn("Product 3", to_json(expr("named_struct('cod_art', `Product 3`, 'rank', '3')")))
      .show(false)

    /**
      * +-------+------------------------------+------------------------------+------------------------------+
      * |cod_cli|Product 1                     |Product 2                     |Product 3                     |
      * +-------+------------------------------+------------------------------+------------------------------+
      * |123    |{"cod_art":"art_1","rank":"1"}|{"cod_art":"art_2","rank":"2"}|{"cod_art":"art_3","rank":"3"}|
      * |456    |{"cod_art":"art_4","rank":"1"}|{"cod_art":"art_5","rank":"2"}|{"cod_art":"art_6","rank":"3"}|
      * +-------+------------------------------+------------------------------+------------------------------+
      */

1
感谢您的回答。我按照您的解决方案来改进我的问题并使其更加动态化(可能未定义“产品”的数量)。我更新了原始信息。 - br1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接