摘要:
一般而言,迭代算法,特别是带有自连接或自联合的算法,需要控制以下内容:
此处描述的问题是前者缺乏的结果。在每次迭代中,自连接会导致分区数增加,从而形成指数模式。为了解决这个问题,您必须要么控制每次迭代中的分区数(请参阅下文),或使用全局工具,例如spark.default.parallelism
(请参阅Travis提供的 答案)。总体而言,第一种方法通常提供更多的控制,并且不影响代码的其他部分。
原始回答:
据我所知,这里有两个交织在一起的问题-分区数量的增加和连接期间的洗牌开销。这两个问题都可以轻松处理,因此让我们逐步进行。
首先,让我们创建一个帮助程序来收集统计信息:
import datetime
def get_stats(i, init, init2, init3, init4,
start, end, desc, cache, part, hashp):
return {
"i": i,
"init": init.getNumPartitions(),
"init1": init2.getNumPartitions(),
"init2": init3.getNumPartitions(),
"init4": init4.getNumPartitions(),
"time": str(end - start),
"timen": (end - start).seconds + (end - start).microseconds * 10 **-6,
"desc": desc,
"cache": cache,
"part": part,
"hashp": hashp
}
另一个帮助处理缓存/分区的辅助程序
def procRDD(rdd, cache=True, part=False, hashp=False, npart=16):
rdd = rdd if not part else rdd.repartition(npart)
rdd = rdd if not hashp else rdd.partitionBy(npart)
return rdd if not cache else rdd.cache()
提取流程逻辑:
def run(init, description, cache=True, part=False, hashp=False,
npart=16, n=6):
times = []
for i in range(n):
start = datetime.datetime.now()
init2 = procRDD(
init.map(lambda n: (n, n*3)),
cache, part, hashp, npart)
init3 = procRDD(
init.map(lambda n: (n, n*2)),
cache, part, hashp, npart)
init4 = init2.join(init3, npart) if part else init2.join(init3)
init = init4.map(lambda n: n[0])
if cache:
init4.cache()
init.cache()
init.count()
end = datetime.datetime.now()
times.append(get_stats(
i, init, init2, init3, init4,
start, end, description,
cache, part, hashp
))
return times
并创建初始数据:
ncores = 8
init = sc.parallelize(xrange(10000), ncores * 2).cache()
如果没有提供numPartitions
参数,则通过自身的连接操作根据输入RDD的分区数来调整输出的分区数。这意味着每次迭代都会增加分区数。如果分区数太多,事情就会变得很丑陋。您可以通过为连接或重新分区RDD提供numPartitions
参数来解决这些问题。
timesCachePart = sqlContext.createDataFrame(
run(init, "cache + partition", True, True, False, ncores * 2))
timesCachePart.select("i", "init1", "init2", "init4", "time", "desc").show()
+-+-----+-----+-----+--------------+-----------------+
|i|init1|init2|init4| time| desc|
+-+-----+-----+-----+--------------+-----------------+
|0| 16| 16| 16|0:00:01.145625|cache + partition|
|1| 16| 16| 16|0:00:01.090468|cache + partition|
|2| 16| 16| 16|0:00:01.059316|cache + partition|
|3| 16| 16| 16|0:00:01.029544|cache + partition|
|4| 16| 16| 16|0:00:01.033493|cache + partition|
|5| 16| 16| 16|0:00:01.007598|cache + partition|
+-+-----+-----+-----+--------------+-----------------+
如您所见,当我们重新分区时,执行时间基本保持不变。
第二个问题是数据被随机分区的。为了确保连接性能,我们希望在单个分区上具有相同的键。为了实现这一点,我们可以使用哈希分区器:
timesCacheHashPart = sqlContext.createDataFrame(
run(init, "cache + hashpart", True, True, True, ncores * 2))
timesCacheHashPart.select("i", "init1", "init2", "init4", "time", "desc").show()
+-+
|i|init1|init2|init4| time| desc|
+-+
|0| 16| 16| 16|0:00:00.946379|cache + hashpart|
|1| 16| 16| 16|0:00:00.966519|cache + hashpart|
|2| 16| 16| 16|0:00:00.945501|cache + hashpart|
|3| 16| 16| 16|0:00:00.986777|cache + hashpart|
|4| 16| 16| 16|0:00:00.960989|cache + hashpart|
|5| 16| 16| 16|0:00:01.026648|cache + hashpart|
+-+
执行时间与以前一样恒定,并且在基本分区的基础上有微小的改进。
现在只使用缓存作为参考:
timesCacheOnly = sqlContext.createDataFrame(
run(init, "cache-only", True, False, False, ncores * 2))
timesCacheOnly.select("i", "init1", "init2", "init4", "time", "desc").show()
+-+
|i|init1|init2|init4| time| desc|
+-+
|0| 16| 16| 32|0:00:00.992865|cache-only|
|1| 32| 32| 64|0:00:01.766940|cache-only|
|2| 64| 64| 128|0:00:03.675924|cache-only|
|3| 128| 128| 256|0:00:06.477492|cache-only|
|4| 256| 256| 512|0:00:11.929242|cache-only|
|5| 512| 512| 1024|0:00:23.284508|cache-only|
+-+
正如您所看到的,缓存版本的分区数量(init2、init3、init4)每次迭代都会翻一番,并且执行时间与分区数量成比例。
最后,我们可以通过使用哈希分区来检查是否可以在有大量分区时提高性能:
timesCacheHashPart512 = sqlContext.createDataFrame(
run(init, "cache + hashpart 512", True, True, True, 512))
timesCacheHashPart512.select(
"i", "init1", "init2", "init4", "time", "desc").show()
+-+-----+-----+-----+--------------+--------------------+
|i|init1|init2|init4| time| desc|
+-+-----+-----+-----+--------------+--------------------+
|0| 512| 512| 512|0:00:14.492690|cache + hashpart 512|
|1| 512| 512| 512|0:00:20.215408|cache + hashpart 512|
|2| 512| 512| 512|0:00:20.408070|cache + hashpart 512|
|3| 512| 512| 512|0:00:20.390267|cache + hashpart 512|
|4| 512| 512| 512|0:00:20.362354|cache + hashpart 512|
|5| 512| 512| 512|0:00:19.878525|cache + hashpart 512|
+-+-----+-----+-----+--------------+--------------------+
改进并不是那么令人印象深刻,但如果您有一个小的集群和大量的数据,仍然值得一试。
我认为这里的主要信息是分区很重要。 在某些情况下,它会自动处理(mllib
,sql
),但如果您使用低级操作,则需要自行负责。