基于列值变化分区pyspark dataframe

6

我有一个在pyspark中的数据框。 假设有一些列a,b,c... 我想将数据分组,使得列值发生变化。 比如说

A  B
1  x
1  y
0  x
0  y
0  x
1  y
1  x
1  y

将有3个组,分别为(1x,1y),(0x,0y,0x),(1y,1x,1y) 以及相应的行数据

1个回答

11

如果我理解正确,您希望在 A 列的值发生变化时创建一个不同的组。

首先,我们将创建一个单调递增的 ID,以保持行顺序不变:

import pyspark.sql.functions as psf
df = sc.parallelize([[1,'x'],[1,'y'],[0,'x'],[0,'y'],[0,'x'],[1,'y'],[1,'x'],[1,'y']])\
    .toDF(['A', 'B'])\
    .withColumn("rn", psf.monotonically_increasing_id())
df.show()

    +---+---+----------+
    |  A|  B|        rn|
    +---+---+----------+
    |  1|  x|         0|
    |  1|  y|         1|
    |  0|  x|         2|
    |  0|  y|         3|
    |  0|  x|8589934592|
    |  1|  y|8589934593|
    |  1|  x|8589934594|
    |  1|  y|8589934595|
    +---+---+----------+

现在我们将使用窗口函数创建一个包含1的列,每当A列发生变化时:

from pyspark.sql import Window
w = Window.orderBy('rn')
df = df.withColumn("changed", (df.A != psf.lag('A', 1, 0).over(w)).cast('int'))

    +---+---+----------+-------+
    |  A|  B|        rn|changed|
    +---+---+----------+-------+
    |  1|  x|         0|      1|
    |  1|  y|         1|      0|
    |  0|  x|         2|      1|
    |  0|  y|         3|      0|
    |  0|  x|8589934592|      0|
    |  1|  y|8589934593|      1|
    |  1|  x|8589934594|      0|
    |  1|  y|8589934595|      0|
    +---+---+----------+-------+

最后,我们将使用另一个窗口函数来为每个分组分配不同的数字:

df = df.withColumn("group_id", psf.sum("changed").over(w)).drop("rn").drop("changed")

    +---+---+--------+
    |  A|  B|group_id|
    +---+---+--------+
    |  1|  x|       1|
    |  1|  y|       1|
    |  0|  x|       2|
    |  0|  y|       2|
    |  0|  x|       2|
    |  1|  y|       3|
    |  1|  x|       3|
    |  1|  y|       3|
    +---+---+--------+

现在你可以创建你的群组


谢谢,这非常有帮助。 - Jugraj Singh
没问题Jugraj,别忘了将你的问题标记为已解决 :) - MaFF

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接