使用Python的reduce()函数来合并多个PySpark数据框。

5

有人知道为什么使用Python3的functools.reduce()在连接多个PySpark数据框时会导致性能变差,而仅使用for循环迭代地连接相同的数据框却不会?具体来说,这会导致严重的减速,然后出现内存溢出错误:

def join_dataframes(list_of_join_columns, left_df, right_df):
    return left_df.join(right_df, on=list_of_join_columns)

joined_df = functools.reduce(
    functools.partial(join_dataframes, list_of_join_columns), list_of_dataframes,
)

相比之下,这个没有:
joined_df = list_of_dataframes[0]
joined_df.cache()
for right_df in list_of_dataframes[1:]:
    joined_df = joined_df.join(right_df, on=list_of_join_columns)

任何想法都将不胜感激。谢谢!
2个回答

2
只要您使用CPython(不同的实现可以,但在这种情况下实际上不应该表现出显著不同的行为)。如果您查看{{link1:reduce implementation}},您会发现它只是一个带有最小异常处理的for循环。
核心与您使用的循环完全等价。
for element in it:
    value = function(value, element)

“而且没有证据支持任何特殊行为的主张。”
“此外,使用帧数进行简单测试时,Spark连接的实际限制(连接是 Spark 中最昂贵的操作之一)。”
dfs = [
    spark.range(10000).selectExpr(
        "rand({}) AS id".format(i), "id AS value",  "{} AS loop ".format(i)
    )
    for i in range(200)
]

在直接for循环之间的时间上没有显著差异。
def f(dfs):
    df1 = dfs[0]
    for df2 in dfs[1:]:
        df1 = df1.join(df2, ["id"])
    return df1

%timeit -n3 f(dfs)                 
## 6.25 s ± 257 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)

"并且减少 'reduce' 函数的调用"
from functools import reduce

def g(dfs):
    return reduce(lambda x, y: x.join(y, ["id"]), dfs) 

%timeit -n3 g(dfs)
### 6.47 s ± 455 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)

类似的,for循环和reduce之间的JVM整体行为模式是可比较的。 For loop CPU and Memory Usage - VisualVM 以及 reduce CPU and Memory Usage - VisualVM 最后两者生成相同的执行计划。
g(dfs)._jdf.queryExecution().optimizedPlan().equals( 
    f(dfs)._jdf.queryExecution().optimizedPlan()
)
## True

这句话的意思是:“当评估计划时没有差异,但很可能会发生OOM(内存不足)问题。”换句话说,你的相关性并不意味着因果关系,观察到的性能问题不太可能与你使用的方法来组合数据框有关。

这种方法能否使用左连接或外连接,如下所示: return reduce(lambda x, y: x.join(y, ["id"]), how="left", dfs) - thentangler

1
一个原因是reduce或fold通常是函数式纯的:每个累加操作的结果不会被写入同一部分内存,而是写入新的内存块。
原则上,垃圾收集器可以在每次累加后释放前一个块,但如果它没有这样做,你将为每个更新版本的累加器分配内存。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接