基于PySpark DataFrame的分组数据进行Pandas风格的转换

20
如果我们有一个Pandas数据框,其中包含一个类别列和一个值列,我们可以通过执行以下操作来删除每个类别中的平均值:
df["DemeanedValues"] = df.groupby("Category")["Values"].transform(lambda g: g - numpy.mean(g))
据我所知,Spark数据框架直接不提供这种分组/转换操作(我正在使用Spark 1.5.0上的PySpark)。那么,最好的实现方法是什么?
我尝试使用分组/连接来实现,如下所示:
df2 = df.groupBy("Category").mean("Values")
df3 = df2.join(df)

但是,据我理解,每个类别需要对DataFrame进行完整扫描,因此速度非常慢。

我认为(但没有验证),如果我将group-by/mean的结果收集到一个字典中,然后在UDF中使用该字典,可以大大加快速度:

nameToMean = {...}
f = lambda category, value: value - nameToMean[category]
categoryDemeaned = pyspark.sql.functions.udf(f, pyspark.sql.types.DoubleType())
df = df.withColumn("DemeanedValue", categoryDemeaned(df.Category, df.Value))

有没有一种惯用的方式来表达这种操作而不牺牲性能?

3个回答

12
我知道了,每个类别都需要对DataFrame进行全面扫描。
不需要。DataFrame的聚合使用类似于aggregateByKey的逻辑进行。请参见DataFrame groupBy行为/优化。较慢的部分是join,需要排序/洗牌。但它仍然不需要每个组的扫描。
如果这是你使用的确切代码,它很慢,因为你没有提供一个连接表达式。由于这个原因,它只执行笛卡尔积。因此,它不仅效率低下,而且是不正确的。你需要像这样的东西:
from pyspark.sql.functions import col

means = df.groupBy("Category").mean("Values").alias("means")
df.alias("df").join(means, col("df.Category") == col("means.Category"))

我认为(但尚未验证)如果将group-by/mean的结果收集到字典中,然后在UDF中使用该字典,可以大大加快速度。

在不同情况下性能可能有所差异。使用Python UDF的一个问题是必须将数据移动到Python中并返回。但它肯定值得一试。您应该考虑使用广播变量nameToMean

有没有惯用的方法来表达这种类型的操作而不牺牲性能?

在PySpark 1.6中,您可以使用broadcast函数:

df.alias("df").join(
    broadcast(means), col("df.Category") == col("means.Category"))

但是它在版本<=1.5中不可用。


谢谢回复。我不知道df.join()中的笛卡尔积行为,我错误地假设默认行为是在任何共享相同名称的列上进行连接。通过为类别列添加别名并进行显式等式测试,可以大大加快速度。 - Peter Lubans
不客气。始终检查执行的扩展执行计划(df.explain(extended=True))是非常有用的。忽略配置的最常见问题与笛卡尔积相关,即使提供了连接表达式也可能无法优化。 - zero323

12
你可以使用Window来完成这个任务。
也就是说。
import pyspark.sql.functions as F
from pyspark.sql.window import Window

window_var = Window().partitionBy('Categroy')
df = df.withColumn('DemeanedValues', F.col('Values') - F.mean('Values').over(window_var))

4
实际上,Spark 中有一种惯用的方法可以使用 Hive 的 OVER 表达式来实现此目的。
即:
df.registerTempTable('df')
with_category_means = sqlContext.sql('select *, mean(Values) OVER (PARTITION BY Category) as category_mean from df')

在底层,这是使用了窗口函数。虽然我不确定这是否比您的解决方案更快。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接