如何在对dataframe使用groupby
后,使用collect_set
或collect_list
函数。例如: df.groupby('key').collect_set('values')
。但是出现了错误:AttributeError: 'GroupedData' object has no attribute 'collect_set'
如何在对dataframe使用groupby
后,使用collect_set
或collect_list
函数。例如: df.groupby('key').collect_set('values')
。但是出现了错误:AttributeError: 'GroupedData' object has no attribute 'collect_set'
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
sc = SparkContext("local")
sqlContext = HiveContext(sc)
df = sqlContext.createDataFrame([
("a", None, None),
("a", "code1", None),
("a", "code2", "name2"),
], ["id", "code", "name"])
df.show()
+---+-----+-----+
| id| code| name|
+---+-----+-----+
| a| null| null|
| a|code1| null|
| a|code2|name2|
+---+-----+-----+
注意,在上述代码中你需要创建一个HiveContext。参考https://dev59.com/7lsW5IYBdhLWcg3wATHe#35529093以处理不同版本的Spark。
(df
.groupby("id")
.agg(F.collect_set("code"),
F.collect_list("name"))
.show())
+---+-----------------+------------------+
| id|collect_set(code)|collect_list(name)|
+---+-----------------+------------------+
| a| [code1, code2]| [name2]|
+---+-----------------+------------------+
如果您的数据框很大,可以尝试使用pandas udf(GROUPED_AGG)来避免内存错误。这样做也会更快。
分组聚合 Pandas UDF 与 Spark 聚合函数类似。分组聚合 Pandas UDF 与 groupBy().agg() 和 pyspark.sql.Window 一起使用。它定义了从一个或多个 pandas.Series 到标量值的聚合,其中每个 pandas.Series 表示组或窗口中的列。pandas udf
例如:
import pyspark.sql.functions as F
@F.pandas_udf('string', F.PandasUDFType.GROUPED_AGG)
def collect_list(name):
return ', '.join(name)
grouped_df = df.groupby('id').agg(collect_list(df["name"]).alias('names'))