Pyspark - 合并两列集合

5
我有一个Spark DataFrame,其中有两列是通过collect_set函数形成的。我想将这两列集合合并成一列集合。它们都是字符串集合。
例如,我有两列是通过调用collect_set函数形成的。
Fruits                  |    Meat
[Apple,Orange,Pear]          [Beef, Chicken, Pork]

我该如何将其转换为:
Food

[Apple,Orange,Pear, Beef, Chicken, Pork]

非常感谢您的帮助。

1
请提供更多信息,例如数据帧的结构和示例。 - Avishek Bhattacharya
4个回答

9
我也在用Python解决这个问题,所以这里是Ramesh的解决方案的Python版本:
df = spark.createDataFrame([(['Pear','Orange','Apple'], ['Chicken','Pork','Beef'])],
                           ("Fruits", "Meat"))
df.show(1,False)

from pyspark.sql.functions import udf
mergeCols = udf(lambda fruits, meat: fruits + meat)
df.withColumn("Food", mergeCols(col("Fruits"), col("Meat"))).show(1,False)

输出:

+---------------------+---------------------+
|Fruits               |Meat                 |
+---------------------+---------------------+
|[Pear, Orange, Apple]|[Chicken, Pork, Beef]|
+---------------------+---------------------+
+---------------------+---------------------+------------------------------------------+
|Fruits               |Meat                 |Food                                      |
+---------------------+---------------------+------------------------------------------+
|[Pear, Orange, Apple]|[Chicken, Pork, Beef]|[Pear, Orange, Apple, Chicken, Pork, Beef]|
+---------------------+---------------------+------------------------------------------+

向Ramesh致敬!


编辑:请注意,您可能需要手动指定列类型(不确定为什么我在某些情况下没有显式类型规范的情况下可以工作 - 在其他情况下,我得到的是字符串类型列)。

from pyspark.sql.types import *
mergeCols = udf(lambda fruits, meat: fruits + meat, ArrayType(StringType()))

2

假设您的 dataframe 如下:

+---------------------+---------------------+
|Fruits               |Meat                 |
+---------------------+---------------------+
|[Pear, Orange, Apple]|[Chicken, Pork, Beef]|
+---------------------+---------------------+

您可以编写一个 udf 函数将两个列的集合合并为一个。
import org.apache.spark.sql.functions._
def mergeCols = udf((fruits: mutable.WrappedArray[String], meat: mutable.WrappedArray[String]) => fruits ++ meat)

然后调用udf函数,如下所示:

df.withColumn("Food", mergeCols(col("Fruits"), col("Meat"))).show(false)

您应该拥有您所需的最终数据框
+---------------------+---------------------+------------------------------------------+
|Fruits               |Meat                 |Food                                      |
+---------------------+---------------------+------------------------------------------+
|[Pear, Orange, Apple]|[Chicken, Pork, Beef]|[Pear, Orange, Apple, Chicken, Pork, Beef]|
+---------------------+---------------------+------------------------------------------+

这是用Python吗?我似乎找不到mutable.WrappedArray。 - soulless
这都是用Scala编写的 :) - Ramesh Maharjan

2
在这里添加解决方案,以定义不包含重复项的集合。还可以避免使用Python UDF时出现任何性能问题。
需要Spark 2.4+。
from pyspark.sql import functions as F
df = spark.createDataFrame([(['Chicken','Pork','Beef',"Tuna"], ["Salmon", "Tuna"])],
                           ("Meat", "Fish"))
df.show(1,False)
df_union = df.withColumn("set_union", F.array_distinct(F.array_union("Meat", "Fish")))
df_union.show(1, False)

结果是

+---------------------------+--------------+-----------------------------------+
|Meat                       |Fish          |set_union                          |
+---------------------------+--------------+-----------------------------------+
|[Chicken, Pork, Beef, Tuna]|[Salmon, Tuna]|[Chicken, Pork, Beef, Tuna, Salmon]|
+---------------------------+--------------+-----------------------------------+

0
假设 df 已经
+--------------------+--------------------+
|              Fruits|                Meat|
+--------------------+--------------------+
|[Pear, Orange, Ap...|[Chicken, Pork, B...|
+--------------------+--------------------+

那么

import itertools
df.rdd.map(lambda x: [item for item in itertools.chain(x.Fruits, x.Meat)]).collect()

创建一个包含FruitsMeat的集合,即:
[[u'Pear', u'Orange', u'Apple', u'Chicken', u'Pork', u'Beef']]


希望这能有所帮助!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接