使用groupby的pyspark collect_set或collect_list

81

如何在对dataframe使用groupby后,使用collect_setcollect_list函数。例如: df.groupby('key').collect_set('values')。但是出现了错误:AttributeError: 'GroupedData' object has no attribute 'collect_set'


2
你可以发布一些样本数据,以便我们调试您的问题吗? - Katya Willard
在Pyspark中它运行良好,顺便说一下,我正在尝试精确地将这项工作翻译成Scala Spark https://johnpaton.net/posts/forward-fill-spark/(我的意思是工作的范围是回填和向前填充,这就是在Pyspark中它是如何工作的)。 - Olfa2
2个回答

157
你需要使用agg。示例:
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

sc = SparkContext("local")

sqlContext = HiveContext(sc)

df = sqlContext.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

df.show()

+---+-----+-----+
| id| code| name|
+---+-----+-----+
|  a| null| null|
|  a|code1| null|
|  a|code2|name2|
+---+-----+-----+

注意,在上述代码中你需要创建一个HiveContext。参考https://dev59.com/7lsW5IYBdhLWcg3wATHe#35529093以处理不同版本的Spark。

(df
  .groupby("id")
  .agg(F.collect_set("code"),
       F.collect_list("name"))
  .show())

+---+-----------------+------------------+
| id|collect_set(code)|collect_list(name)|
+---+-----------------+------------------+
|  a|   [code1, code2]|           [name2]|
+---+-----------------+------------------+

35
collect_set() 返回不重复的元素集合,而 collect_list() 返回所有元素(除了空值)。 - Grant Shannon
使用collect_set或collect_list函数的size函数更适合计算计数值,或者使用普通的count函数。我正在使用窗口来获取附加到帐户的交易计数。 - user3858193
3
当我在列表中有多个列,例如:agg(collect_list(struct(df.f1,df.f2,df.f3))),如何将collect_list的输出作为字典输出。每个组的输出应为[f1:value,f2:value,f3:value]。 - Immanuel Fredrick
在处理大型数据框时,collect_set 似乎无法为组收集正确的值。有什么想法吗? - haneulkim

-4

如果您的数据框很大,可以尝试使用pandas udf(GROUPED_AGG)来避免内存错误。这样做也会更快。

分组聚合 Pandas UDF 与 Spark 聚合函数类似。分组聚合 Pandas UDF 与 groupBy().agg() 和 pyspark.sql.Window 一起使用。它定义了从一个或多个 pandas.Series 到标量值的聚合,其中每个 pandas.Series 表示组或窗口中的列。pandas udf

例如:

import pyspark.sql.functions as F

@F.pandas_udf('string', F.PandasUDFType.GROUPED_AGG)
def collect_list(name):
    return ', '.join(name)

grouped_df = df.groupby('id').agg(collect_list(df["name"]).alias('names'))

15
我认为自定义的UDF并不比Spark内置函数更快。 - jwdink
2
我知道pandas UDF比Spark内置函数慢得多(而且,pandas UDF需要从您的集群中占用更多内存)!什么更快,纯Java/Scala还是Java必须调用Python来处理数据结构,同时还必须通过Arrow序列化为pandas DF? - Marco

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接