两个数字似乎都比较高,而且不清楚您是如何创建DataFrame或测量时间的,但通常这样的差异可以解释为记录数相对于分区数过低所致。默认的spark.sql.shuffle.partitions值为200,它确定了您获得的任务数量。当有50K条记录时,启动任务的开销将会比并行执行带来的加速更高。我们用一个简单的例子来说明这一点。首先让我们创建一个示例数据:
import string
import random
random.seed(323)
def random_string():
n = random.randint(3, 6)
return (''.join(random.choice(string.ascii_uppercase) for _ in range(n)), )
df = (sc
.parallelize([random_string() for _ in range(50000)], 8).toDF(["name"])
.cache())
根据shuffle.partitions
的数量来衡量时间:
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
%timeit -n 10 df.groupby('name').count().collect()
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
%timeit -n 10 df.groupby('name').count().collect()
sqlContext.setConf("spark.sql.shuffle.partitions", "100")
%timeit -n 10 df.groupby('name').count().collect()
sqlContext.setConf("spark.sql.shuffle.partitions", "200")
%timeit -n 10 df.groupby('name').count().collect()
sqlContext.setConf("spark.sql.shuffle.partitions", "1000")
%timeit -n 10 df.groupby('name').count().collect()
尽管这些值与你所声称的不可比,且此数据是在本地模式下收集的,但你可以看到相对清晰的模式。RDD同样适用:
from operator import add
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1).collect()
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 10).collect()
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 100).collect()
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1000).collect()
在一个合适的分布式环境中,由于网络 IO 成本高,这个值将会更高。
仅供比较,让我们来查看在没有 Spark 的情况下本地执行此任务需要多长时间。
from collections import Counter
data = df.rdd.flatMap(lambda x: x).collect()
%timeit -n 10 Counter(data)
同时,您还应该考虑数据本地性。根据您使用的存储和配置,即使是这样小的输入,它也可能会给您的作业增加额外的延迟。