PySpark将结构数组转换为字符串

3

我在Pyspark中有以下数据框:

+----+-------+-----+                                                            
|name|subject|score|
+----+-------+-----+
| Tom|   math|   90|
| Tom|physics|   70|
| Amy|   math|   95|
+----+-------+-----+

我使用了来自 pyspark.sql.functionscollect_liststruct 函数。

df.groupBy('name').agg(collect_list(struct('subject', 'score')).alias('score_list'))

获取以下数据框

+----+--------------------+
|name|          score_list|
+----+--------------------+
| Tom|[[math, 90], [phy...|
| Amy|        [[math, 95]]|
+----+--------------------+

我的问题是如何将最后一列score_list转换成字符串并将其导出为一个csv文件,看起来像这样:
Tom     (math, 90) | (physics, 70)
Amy     (math, 95)

感谢任何帮助,谢谢。

更新:这里有一个类似的问题,但并不完全相同,因为它直接从string转到另一个string。在我的情况下,我想先将string转换为collect_list<struct>,然后再将此collect_list<struct>字符串化。


使用concat_ws函数 - 查找重复项。 - pault
你的Spark版本是多少? - jxc
@jxc 我使用的是 Spark 2.4.3。 - FrancisYL
2个回答

7

根据您的更新和评论,对于Spark 2.4.0+,以下是使用Spark SQL内置函数将结构数组转为字符串的一种方法:transformarray_join

>>> df.printSchema()
root
 |-- name: string (nullable = true)
 |-- score_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- subject: string (nullable = true)
 |    |    |-- score: integer (nullable = true)

>>> df.show(2,0)
+----+---------------------------+
|name|score_list                 |
+----+---------------------------+
|Tom |[[math, 90], [physics, 70]]|
|Amy |[[math, 95]]               |
+----+---------------------------+

>>> df1.selectExpr(
        "name"
      , """
         array_join(
             transform(score_list, x -> concat('(', x.subject, ', ', x.score, ')'))
           , ' | '
         ) AS score_list
        """
).show(2,0)

+----+--------------------------+
|name|score_list                |
+----+--------------------------+
|Tom |(math, 90) | (physics, 70)|
|Amy |(math, 95)                |
+----+--------------------------+

说明:

  1. 使用 transform() 将结构体数组转换为字符串数组。对于每个数组元素(结构体 x),我们使用 concat('(', x.subject, ', ', x.score, ')') 将其转换为字符串。
  2. 使用 array_join() 将所有的字符串数组元素(StringType)以 | 连接起来,这将返回最终的字符串。

4

我链接的重复内容并不能完全回答你的问题,因为你正在合并多个列。尽管如此,你可以很容易地修改这些解决方案来适应你所需的输出。

只需将struct替换为concat_ws。同时使用concat添加一个开放和关闭括号,以获得你想要的输出。

from pyspark.sql.functions import concat, concat_ws, lit

df = df.groupBy('name')\
    .agg(
        concat_ws(
            " | ", 
            collect_list(
                concat(lit("("), concat_ws(", ", 'subject', 'score'), lit(")"))
            )
        ).alias('score_list')
    )
df.show(truncate=False)

#+----+--------------------------+
#|name|score_list                |
#+----+--------------------------+
#|Tom |(math, 90) | (physics, 70)|
#|Amy |(math, 95)                |
#+----+--------------------------+

请注意,由于逗号出现在score_list列中,如果您使用默认参数写入csv,此值将被引用。
例如:
df.coalesce(1).write.csv("test.csv")

将会生成以下输出文件:
Tom,"(math, 90) | (physics, 70)"
Amy,"(math, 95)"

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接