如何在Pyspark中展开数据框的多列

42

我有一个类似以下内容的包含列表的数据框。所有列中列表的长度不同。

Name  Age  Subjects                  Grades
[Bob] [16] [Maths,Physics,Chemistry] [A,B,C]

我希望将数据框按照特定方式展开,得到以下输出 -

Name Age Subjects Grades
Bob  16   Maths     A
Bob  16  Physics    B
Bob  16  Chemistry  C

我该如何实现这个目标?


2
你想将给定数组中的索引与行中的其他数组匹配吗?就像 数学-> A物理-> B化学-> C。因此,类似 数学-> B 的东西是错误的。 - Tanjin
是的,@Tanjin那样做是错误的。 - Visualisation App
7个回答

67

PySpark 2.4版本新增了arrays_zip函数,可消除使用Python UDF压缩数组的必要。

import pyspark.sql.functions as F
from pyspark.sql.types import *

df = sql.createDataFrame(
    [(['Bob'], [16], ['Maths','Physics','Chemistry'], ['A','B','C'])],
    ['Name','Age','Subjects', 'Grades'])
df = df.withColumn("new", F.arrays_zip("Subjects", "Grades"))\
       .withColumn("new", F.explode("new"))\
       .select("Name", "Age", F.col("new.Subjects").alias("Subjects"), F.col("new.Grades").alias("Grades"))
df.show()

+-----+----+---------+------+
| Name| Age| Subjects|Grades|
+-----+----+---------+------+
|[Bob]|[16]|    Maths|     A|
|[Bob]|[16]|  Physics|     B|
|[Bob]|[16]|Chemistry|     C|
+-----+----+---------+------+

18

这有效。

import pyspark.sql.functions as F
from pyspark.sql.types import *

df = sql.createDataFrame(
    [(['Bob'], [16], ['Maths','Physics','Chemistry'], ['A','B','C'])],
    ['Name','Age','Subjects', 'Grades'])
df.show()

+-----+----+--------------------+---------+
| Name| Age|            Subjects|   Grades|
+-----+----+--------------------+---------+
|[Bob]|[16]|[Maths, Physics, ...|[A, B, C]|
+-----+----+--------------------+---------+

使用 udfzip。 需要 explode 的列必须在爆炸之前合并。

combine = F.udf(lambda x, y: list(zip(x, y)),
              ArrayType(StructType([StructField("subs", StringType()),
                                    StructField("grades", StringType())])))

df = df.withColumn("new", combine("Subjects", "Grades"))\
       .withColumn("new", F.explode("new"))\
       .select("Name", "Age", F.col("new.subs").alias("Subjects"), F.col("new.grades").alias("Grades"))
df.show()


+-----+----+---------+------+
| Name| Age| Subjects|Grades|
+-----+----+---------+------+
|[Bob]|[16]|    Maths|     A|
|[Bob]|[16]|  Physics|     B|
|[Bob]|[16]|Chemistry|     C|
+-----+----+---------+------+

1
如果我需要将A、B和C放在不同的列而不是行中,我该怎么办? - Naveen Srikanth
1
UDFs并不是高效且性能良好的。如果存在pyspark API解决方案,应该避免使用它们。 - ARCrow

12

迟到的客人 :-)

最简单的方法是使用inline,它没有Python API,但被selectExpr支持。

df.selectExpr('Name[0] as Name','Age[0] as Age','inline(arrays_zip(Subjects,Grades))').show()

+----+---+---------+------+
|Name|Age| Subjects|Grades|
+----+---+---------+------+
| Bob| 16|    Maths|     A|
| Bob| 16|  Physics|     B|
| Bob| 16|Chemistry|     C|
+----+---+---------+------+

1

你试过这个吗

df.select(explode(split(col("Subjects"))).alias("Subjects")).show()

你可以将数据框转换为RDD。
对于RDD,您可以使用flatMap函数来分离主题。

我尝试使用平面图作为df.rdd.flatMap(lambda x: zip(*[x[c] for c in dcols])).toDF(dcols),但它只给了我第一行并忽略了其余的行- |16 |A |Bob |Maths |。 - Visualisation App

0

如果您需要在数据集中的许多列上快速轻松地重复此操作,请使用复制/粘贴功能

cols = ["word", "stem", "pos", "ner"]

def explode_cols(self, data, cols):
    data = data.withColumn('exp_combo', f.arrays_zip(*cols))
    data = data.withColumn('exp_combo', f.explode('exp_combo'))
    for col in cols:
        data = data.withColumn(col, f.col('exp_combo.' + col))

    return data.drop(f.col('exp_combo'))

result = explode_cols(data, cols)

不用谢 :)


arrays_zip不接受列表作为输入。 - Quynh-Mai Chu
@Quynh-MaiChu,你需要像上面函数f.arrays_zip(*cols)中写的那样使用星号表达式,这样Python才能将你的列表处理为参数而不是列表对象。如果你觉得更清晰,也可以使用f.col('col1')、f.col('col2')来进行arrays_zip操作。 - nasty

0

感谢 @nasty 挽救了这一天。 只需要进行小的调整就可以让代码正常工作。

def explode_cols( df, cl):
df = df.withColumn('exp_combo', arrays_zip(*cl))
df = df.withColumn('exp_combo', explode('exp_combo'))
for colm in cl:
    final_col = 'exp_combo.'+ colm 
    df = df.withColumn(final_col, col(final_col))
    
    #print col
    #print ('exp_combo.'+ colm)
return df.drop(col('exp_combo'))

0

当拆分多列时,上述解决方案仅在数组长度相同时非常有用,但如果它们不同。 最好分别拆分它们,并每次取不同的值。

df = sql.createDataFrame(
    [(['Bob'], [16], ['Maths','Physics','Chemistry'], ['A','B','C'])],
    ['Name','Age','Subjects', 'Grades'])

df = df.withColumn('Subjects',F.explode('Subjects')).select('Name','Age','Subjects', 'Grades').distinct()

df = df.withColumn('Grades',F.explode('Grades')).select('Name','Age','Subjects', 'Grades').distinct()

df.show()

 +----+---+---------+------+
|Name|Age| Subjects|Grades|
+----+---+---------+------+
| Bob| 16|    Maths|     A|
| Bob| 16|  Physics|     B|
| Bob| 16|Chemistry|     C|
+----+---+---------+------+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接