使用PySpark按多列进行分区,并将列放入列表中

22

我的问题与这个帖子类似: Spark SQL中按多列分区

但我使用的是Pyspark而不是Scala,我想将列名列表作为一个列表传递进去。我想要做的事情类似于这样:

column_list = ["col1","col2"]
win_spec = Window.partitionBy(column_list)

我可以让以下内容工作:

win_spec = Window.partitionBy(col("col1"))

这也可以运作:

col_name = "col1"
win_spec = Window.partitionBy(col(col_name))

这也可以:

win_spec = Window.partitionBy([col("col1"), col("col2")])
3个回答

29

使用列表推导式将列名转换为列表达式[col(x) for x in column_list]

from pyspark.sql.functions import col
from pyspark.sql import Window
column_list = ["col1","col2"]
win_spec = Window.partitionBy([col(x) for x in column_list])

3
更新给查看这个答案的人:较新版本的pyspark允许您像下面的答案一样传递一个列表。请参见@Naguveeru的答案。 - EnterPassword

8

PySpark >= 2.4,这也可以工作 =>

column_list = ["col1","col2"]

win_spec = Window.partitionBy(*column_list)

只是一个快速的问题,*代表column_list中的所有项目,我是否正确理解了代码? - Vanessa_C
2
@Vanessa_C,虽然有点晚了,但在Python中,“*”运算符用于将可迭代对象解包到函数调用中。通过使用*column_list,您无需手动传递“col1”,“col2”(即不必指定列表中的每个元素),也可以实现相同的效果。 - Quan Bui

6
你的第一次尝试应该是有效的。
考虑以下示例:
import pyspark.sql.functions as f
from pyspark.sql import Window

df = sqlCtx.createDataFrame(
    [
        ("a", "apple", 1),
        ("a", "orange", 2),
        ("a", "orange", 3),
        ("b", "orange", 3),
        ("b", "orange", 5)
    ],
    ["name", "fruit","value"]
)
df.show()
#+----+------+-----+
#|name| fruit|value|
#+----+------+-----+
#|   a| apple|    1|
#|   a|orange|    2|
#|   a|orange|    3|
#|   b|orange|    3|
#|   b|orange|    5|
#+----+------+-----+

假设您想通过前两列分组,计算每行的总和的一部分:
cols = ["name", "fruit"]
w = Window.partitionBy(cols)
df.select(cols + [(f.col('value') / f.sum('value').over(w)).alias('fraction')]).show()

#+----+------+--------+
#|name| fruit|fraction|
#+----+------+--------+
#|   a| apple|     1.0|
#|   b|orange|   0.375|
#|   b|orange|   0.625|
#|   a|orange|     0.6|
#|   a|orange|     0.4|
#+----+------+--------+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接