如何在Spark SQL中压缩两个数组列

9

我有一个Pandas数据框。我尝试将包含字符串值的两列合并为一个列表,然后使用zip函数,将列表中的每个元素用“_”连接起来。我的数据集如下所示:

df['column_1']: 'abc, def, ghi'
df['column_2']: '1.0, 2.0, 3.0'

我希望将数据框中每一行的这两列合并为第三列,如下所示。
df['column_3']: [abc_1.0, def_2.0, ghi_3.0]

我已经成功使用下面的Python代码完成了此任务,但是数据框非常大,对整个数据框运行需要很长时间。为了提高效率,我希望在PySpark中完成同样的任务。我已经成功地将数据读入Spark数据框,但是我很难确定如何使用PySpark等效函数复制Pandas函数。如何在PySpark中获得所需的结果?

df['column_3'] = df['column_2']
for index, row in df.iterrows():
  while index < 3:
    if isinstance(row['column_1'], str):      
      row['column_1'] = list(row['column_1'].split(','))
      row['column_2'] = list(row['column_2'].split(','))
      row['column_3'] = ['_'.join(map(str, i)) for i in zip(list(row['column_1']), list(row['column_2']))]

我已经使用以下代码将两列转换为PySpark中的数组:
from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import col, split

crash.withColumn("column_1",
    split(col("column_1"), ",\s*").cast(ArrayType(StringType())).alias("column_1")
)
crash.withColumn("column_2",
    split(col("column_2"), ",\s*").cast(ArrayType(StringType())).alias("column_2")
)

现在我需要使用 '_' 将两列中的每个元素进行压缩。如何使用 zip 实现?非常感谢任何帮助。

1
为什么 df['column_1']df['column_2'] 是单个字符串而不是项目列表?它们最初是什么? - Foxan Ng
这就是我正在从数据框中读取的数据。 - Falconic
@Falconic,abcdef等是在同一行还是不同行?同样,第二列是单独一行吗? - anky
@anky_91 这是数据框中列1和列2的一行。每行在一个列中有多个项目。这就是我将字符串拆分并转换为列表的原因。 - Falconic
这个回答解决了你的问题吗?Pyspark:将多个数组列拆分为行 - Ani Menon
4个回答

19

Spark SQL的等效于Python的函数是pyspark.sql.functions.arrays_zip:

pyspark.sql.functions.arrays_zip(*cols)

集合函数: 返回一个合并数组,其中第N个结构包含输入数组的所有第N个值。

所以如果您已经有两个数组:

from pyspark.sql.functions import split

df = (spark
    .createDataFrame([('abc, def, ghi', '1.0, 2.0, 3.0')])
    .toDF("column_1", "column_2")
    .withColumn("column_1", split("column_1", "\s*,\s*"))
    .withColumn("column_2", split("column_2", "\s*,\s*")))

您只需将其应用于结果即可

from pyspark.sql.functions import arrays_zip

df_zipped = df.withColumn(
  "zipped", arrays_zip("column_1", "column_2")
)

df_zipped.select("zipped").show(truncate=False)
+------------------------------------+
|zipped                              |
+------------------------------------+
|[[abc, 1.0], [def, 2.0], [ghi, 3.0]]|
+------------------------------------+

现在,您可以使用 transform 来合并结果(如何使用 transform 高阶函数?TypeError:Column is not iterable - 如何迭代 ArrayType()?):

df_zipped_concat = df_zipped.withColumn(
    "zipped_concat",
     expr("transform(zipped, x -> concat_ws('_', x.column_1, x.column_2))")
) 

df_zipped_concat.select("zipped_concat").show(truncate=False)
+---------------------------+
|zipped_concat              |
+---------------------------+
|[abc_1.0, def_2.0, ghi_3.0]|
+---------------------------+
注意

Apache Spark 2.4引入了高阶函数transformarrays_zip


感谢 user10465355。这个解决方案对我很有用,但需要注意的是,它不能很好地处理列表中的空值。在将它们连接在一起之前,我手动从两列中删除了空值。其次,我必须在我的原始数据框中执行每个步骤。使用相同列名的多个数据框在 PySpark 中效果不佳。我必须调试代码以查看问题出在哪里。结果发现,我需要在不同操作中使用相同的数据框架。 - Falconic

5

对于Spark 2.4+,这可以仅使用zip_with函数在同一时间上进行拼接和压缩:

df.withColumn("column_3", expr("zip_with(column_1, column_2, (x, y) -> concat(x, '_', y))")) 

这个高阶函数需要两个数组作为输入,使用一个 lambda 函数 (x, y) -> concat(x, '_', y) 按元素合并它们。

4
你可以使用UDF将分割的数组列进行压缩。
df = spark.createDataFrame([('abc,def,ghi','1.0,2.0,3.0')], ['col1','col2']) 
+-----------+-----------+
|col1       |col2       |
+-----------+-----------+
|abc,def,ghi|1.0,2.0,3.0|
+-----------+-----------+ ## Hope this is how your dataframe is

from pyspark.sql import functions as F
from pyspark.sql.types import *

def concat_udf(*args):
    return ['_'.join(x) for x in zip(*args)]

udf1 = F.udf(concat_udf,ArrayType(StringType()))
df = df.withColumn('col3',udf1(F.split(df.col1,','),F.split(df.col2,',')))
df.show(1,False)
+-----------+-----------+---------------------------+
|col1       |col2       |col3                       |
+-----------+-----------+---------------------------+
|abc,def,ghi|1.0,2.0,3.0|[abc_1.0, def_2.0, ghi_3.0]|
+-----------+-----------+---------------------------+

谢谢@suresh。这绝对是一个更干净的解决方案。当我将其应用于我的数据框并运行collect函数时,我得到以下错误 TypeError: zip argument #1 must support iteration 有任何想法为什么会出现这种情况? - Falconic
错误是因为zip()没有得到一个可迭代的输入。请您提供样本输入数据框和其模式。 - Suresh

2

对于 Spark 3.1+,现在提供了 pyspark.sql.functions.zip_with()Python lambda 函数,因此可以这样做:

import pyspark.sql.functions as F

df = df.withColumn("column_3", F.zip_with("column_1", "column_2", lambda x,y: F.concat_ws("_", x, y)))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接