为什么我的Spark DataFrame比RDD慢得多?

4

我有一个非常简单的 Spark DataFrame,在运行 DataFrame groupby 时,性能非常差 - 大约比(在我的脑海中)等价的 RDD reduceByKey 慢了8倍...

我的缓存 DF 只有两列,客户和名称,只有5万行:

== Physical Plan ==
InMemoryColumnarTableScan [customer#2454,name#2456], InMemoryRelation [customer#2454,name#2456], true, 10000, StorageLevel(true, true, false, true, 1), Scan ParquetRelation[customer#2454,name#2456] InputPaths: hdfs://nameservice1/tmp/v2_selected_parquet/test_parquet2, None

当我运行以下两个代码片段时,我期望它们具有相似的性能,但实际上RDD版本只需要10秒钟就能运行完,而DF版本需要85秒钟...
rawtempDF2.rdd.map(lambda x: (x['name'], 1)).reduceByKey(lambda x,y: x+y).collect()

rawtempDF2.groupby('name').count().collect()

我是否漏掉了一些非常基础的东西?值得一提的是,RDD版本运行了54个阶段,而DF版本则为227个:/

编辑:我正在使用Spark 1.6.1和Python 3.4.2。 编辑2:此外,源Parquet已经分区为客户/日期/名称 - 目前有27个客户端,1天,大约45个名称。


这两个数字似乎过高了。你如何运行此代码并测量时间? - zero323
我是从Jupyter笔记本中运行的,并从SparkUI中获取作业运行时间。后端是Mesos(由比我更优秀的人构建),我的Spark实例具有24个核心和99GB RAM。我在所有这些方面都是新手,因此仍在学习最佳的时间/测试方法... - RichD
1个回答

9
两个数字似乎都比较高,而且不清楚您是如何创建DataFrame或测量时间的,但通常这样的差异可以解释为记录数相对于分区数过低所致。默认的spark.sql.shuffle.partitions值为200,它确定了您获得的任务数量。当有50K条记录时,启动任务的开销将会比并行执行带来的加速更高。我们用一个简单的例子来说明这一点。首先让我们创建一个示例数据:
import string
import random

random.seed(323)

def random_string():
  n = random.randint(3, 6)
  return (''.join(random.choice(string.ascii_uppercase) for _ in range(n)), )

df = (sc
    .parallelize([random_string() for _ in range(50000)], 8).toDF(["name"])
    .cache())

根据shuffle.partitions的数量来衡量时间:

sqlContext.setConf("spark.sql.shuffle.partitions", "1")
%timeit -n 10  df.groupby('name').count().collect()
## 10 loops, best of 3: 504 ms per loop

sqlContext.setConf("spark.sql.shuffle.partitions", "1")
%timeit -n 10  df.groupby('name').count().collect()
## 10 loops, best of 3: 451 ms per loop

sqlContext.setConf("spark.sql.shuffle.partitions", "100")
%timeit -n 10  df.groupby('name').count().collect()
## 10 loops, best of 3: 624 ms per loop

sqlContext.setConf("spark.sql.shuffle.partitions", "200")
%timeit -n 10  df.groupby('name').count().collect()
## 10 loops, best of 3: 778 ms per loop

sqlContext.setConf("spark.sql.shuffle.partitions", "1000")
%timeit -n 10  df.groupby('name').count().collect()
## 10 loops, best of 3: 1.75 s per loop

尽管这些值与你所声称的不可比,且此数据是在本地模式下收集的,但你可以看到相对清晰的模式。RDD同样适用:
from operator import add

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1).collect()
## 10 loops, best of 3: 414 ms per loop

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 10).collect()
## 10 loops, best of 3: 439 ms per loop

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 100).collect()
## 10 loops, best of 3: 1.3 s per loop

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1000).collect()
## 10 loops, best of 3: 8.41 s per loop

在一个合适的分布式环境中,由于网络 IO 成本高,这个值将会更高。
仅供比较,让我们来查看在没有 Spark 的情况下本地执行此任务需要多长时间。
from collections import Counter

data = df.rdd.flatMap(lambda x: x).collect()

%timeit -n 10 Counter(data)
## 10 loops, best of 3: 9.9 ms per loop

同时,您还应该考虑数据本地性。根据您使用的存储和配置,即使是这样小的输入,它也可能会给您的作业增加额外的延迟。


出色的演示表明问题出在哪里。100次洗牌运行每个循环需要31秒(使用6节点mesos集群上运行的Spark)-那绝对感觉不对,但在向基础架构团队抱怨之前,我将使用更大的数据集/不同的分区进行更多测试 ;) - RichD
参考:分区/平均运行时间:1 / 1.53秒,10 / 2.6秒,100 / 31.3秒,200 / 65秒。 - RichD
嗯,我相当确信这就是你需要解释比例的全部内容,但绝对值似乎完全不对。 - zero323

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接