使用join时,Spark迭代时间呈指数增长

25

我对Spark还比较新,尝试使用基于马尔可夫模型的迭代算法(期望最大化算法)进行聚类。因此,我需要进行多次迭代和连接操作。

我遇到的一个问题是,每次迭代时间呈指数级增长。
经过一些实验,我发现在进行迭代时需要持久化RDD以便在下一次迭代中重复使用,否则每次迭代Spark都会创建执行计划重新计算RDD,从而增加计算时间。

init = sc.parallelize(xrange(10000000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))        
    init = init2.map(lambda n: n[0])
#     init.cache()

    print init.count()    
    print str(datetime.datetime.now() - start)

结果为:

0
10000000
0:00:04.283652
1
10000000
0:00:05.998830
2
10000000
0:00:08.771984
3
10000000
0:00:11.399581
4
10000000
0:00:14.206069
5
10000000
0:00:16.856993

因此,添加cache()有助于使迭代时间保持恒定。

init = sc.parallelize(xrange(10000000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))        
    init = init2.map(lambda n: n[0])
    init.cache()

    print init.count()    
    print str(datetime.datetime.now() - start)
0
10000000
0:00:04.966835
1
10000000
0:00:04.609885
2
10000000
0:00:04.324358
3
10000000
0:00:04.248709
4
10000000
0:00:04.218724
5
10000000
0:00:04.223368

但是,在迭代过程中进行Join操作会重新出现问题。下面是一些简单的代码示例,演示了这个问题。即使在每个RDD转换上进行缓存也无法解决这个问题:

但是,在循环内进行连接操作时,问题又出现了。以下是一些简单的代码,演示了这个问题。即使在每个RDD转换上进行缓存,也无法解决该问题:

init = sc.parallelize(xrange(10000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))
    init2.cache()

    init3 = init.map(lambda n: (n, n*2))
    init3.cache()

    init4 = init2.join(init3)
    init4.count()
    init4.cache()

    init = init4.map(lambda n: n[0])
    init.cache()

    print init.count()    
    print str(datetime.datetime.now() - start)

以下是输出结果。正如您所看到的,迭代时间呈指数增长:(

0
10000
0:00:00.674115
1
10000
0:00:00.833377
2
10000
0:00:01.525314
3
10000
0:00:04.194715
4
10000
0:00:08.139040
5
10000
0:00:17.852815

我会非常感激任何的帮助 :)


好的,我发现在交互中调用<action>会导致谱系增长的问题。因此,如果迭代次数是恒定的,则不需要在迭代之间缓存RDD,但这并不适用于聚类,因为我想要根据收敛性停止迭代。 - sashaostr
3个回答

29

摘要:

一般而言,迭代算法,特别是带有自连接或自联合的算法,需要控制以下内容:

此处描述的问题是前者缺乏的结果。在每次迭代中,自连接会导致分区数增加,从而形成指数模式。为了解决这个问题,您必须要么控制每次迭代中的分区数(请参阅下文),或使用全局工具,例如spark.default.parallelism(请参阅Travis提供的 答案)。总体而言,第一种方法通常提供更多的控制,并且不影响代码的其他部分。

原始回答:

据我所知,这里有两个交织在一起的问题-分区数量的增加和连接期间的洗牌开销。这两个问题都可以轻松处理,因此让我们逐步进行。

首先,让我们创建一个帮助程序来收集统计信息:

import datetime

def get_stats(i, init, init2, init3, init4,
       start, end, desc, cache, part, hashp):
    return {
        "i": i,
        "init": init.getNumPartitions(),
        "init1": init2.getNumPartitions(),
        "init2": init3.getNumPartitions(),
        "init4": init4.getNumPartitions(),
        "time": str(end - start),
        "timen": (end - start).seconds + (end - start).microseconds * 10 **-6,
        "desc": desc,
        "cache": cache,
        "part": part,
        "hashp": hashp
    }

另一个帮助处理缓存/分区的辅助程序

def procRDD(rdd, cache=True, part=False, hashp=False, npart=16):
    rdd = rdd if not part else rdd.repartition(npart)
    rdd = rdd if not hashp else rdd.partitionBy(npart)
    return rdd if not cache else rdd.cache()

提取流程逻辑:

def run(init, description, cache=True, part=False, hashp=False, 
    npart=16, n=6):
    times = []

    for i in range(n):
        start = datetime.datetime.now()

        init2 = procRDD(
                init.map(lambda n: (n, n*3)),
                cache, part, hashp, npart)
        init3 = procRDD(
                init.map(lambda n: (n, n*2)),
                cache, part, hashp, npart)


        # If part set to True limit number of the output partitions
        init4 = init2.join(init3, npart) if part else init2.join(init3) 
        init = init4.map(lambda n: n[0])

        if cache:
            init4.cache()
            init.cache()

        init.count() # Force computations to get time
        end = datetime.datetime.now() 

        times.append(get_stats(
            i, init, init2, init3, init4,
            start, end, description,
            cache, part, hashp
        ))

    return times

并创建初始数据:

ncores = 8
init = sc.parallelize(xrange(10000), ncores * 2).cache()

如果没有提供numPartitions参数,则通过自身的连接操作根据输入RDD的分区数来调整输出的分区数。这意味着每次迭代都会增加分区数。如果分区数太多,事情就会变得很丑陋。您可以通过为连接或重新分区RDD提供numPartitions参数来解决这些问题。

timesCachePart = sqlContext.createDataFrame(
        run(init, "cache + partition", True, True, False, ncores * 2))
timesCachePart.select("i", "init1", "init2", "init4", "time", "desc").show()

+-+-----+-----+-----+--------------+-----------------+
|i|init1|init2|init4|          time|             desc|
+-+-----+-----+-----+--------------+-----------------+
|0|   16|   16|   16|0:00:01.145625|cache + partition|
|1|   16|   16|   16|0:00:01.090468|cache + partition|
|2|   16|   16|   16|0:00:01.059316|cache + partition|
|3|   16|   16|   16|0:00:01.029544|cache + partition|
|4|   16|   16|   16|0:00:01.033493|cache + partition|
|5|   16|   16|   16|0:00:01.007598|cache + partition|
+-+-----+-----+-----+--------------+-----------------+

如您所见,当我们重新分区时,执行时间基本保持不变。

第二个问题是数据被随机分区的。为了确保连接性能,我们希望在单个分区上具有相同的键。为了实现这一点,我们可以使用哈希分区器:

timesCacheHashPart = sqlContext.createDataFrame(
    run(init, "cache + hashpart", True, True, True, ncores * 2))
timesCacheHashPart.select("i", "init1", "init2", "init4", "time", "desc").show()

+-+-----+-----+-----+--------------+----------------+
|i|init1|init2|init4|          time|            desc|
+-+-----+-----+-----+--------------+----------------+
|0|   16|   16|   16|0:00:00.946379|cache + hashpart|
|1|   16|   16|   16|0:00:00.966519|cache + hashpart|
|2|   16|   16|   16|0:00:00.945501|cache + hashpart|
|3|   16|   16|   16|0:00:00.986777|cache + hashpart|
|4|   16|   16|   16|0:00:00.960989|cache + hashpart|
|5|   16|   16|   16|0:00:01.026648|cache + hashpart|
+-+-----+-----+-----+--------------+----------------+

执行时间与以前一样恒定,并且在基本分区的基础上有微小的改进。

现在只使用缓存作为参考:

timesCacheOnly = sqlContext.createDataFrame(
    run(init, "cache-only", True, False, False, ncores * 2))
timesCacheOnly.select("i", "init1", "init2", "init4", "time", "desc").show()


+-+-----+-----+-----+--------------+----------+
|i|init1|init2|init4|          time|      desc|
+-+-----+-----+-----+--------------+----------+
|0|   16|   16|   32|0:00:00.992865|cache-only|
|1|   32|   32|   64|0:00:01.766940|cache-only|
|2|   64|   64|  128|0:00:03.675924|cache-only|
|3|  128|  128|  256|0:00:06.477492|cache-only|
|4|  256|  256|  512|0:00:11.929242|cache-only|
|5|  512|  512| 1024|0:00:23.284508|cache-only|
+-+-----+-----+-----+--------------+----------+

正如您所看到的,缓存版本的分区数量(init2、init3、init4)每次迭代都会翻一番,并且执行时间与分区数量成比例。

最后,我们可以通过使用哈希分区来检查是否可以在有大量分区时提高性能:

timesCacheHashPart512 = sqlContext.createDataFrame(
    run(init, "cache + hashpart 512", True, True, True, 512))
timesCacheHashPart512.select(
    "i", "init1", "init2", "init4", "time", "desc").show()
+-+-----+-----+-----+--------------+--------------------+
|i|init1|init2|init4|          time|                desc|
+-+-----+-----+-----+--------------+--------------------+
|0|  512|  512|  512|0:00:14.492690|cache + hashpart 512|
|1|  512|  512|  512|0:00:20.215408|cache + hashpart 512|
|2|  512|  512|  512|0:00:20.408070|cache + hashpart 512|
|3|  512|  512|  512|0:00:20.390267|cache + hashpart 512|
|4|  512|  512|  512|0:00:20.362354|cache + hashpart 512|
|5|  512|  512|  512|0:00:19.878525|cache + hashpart 512|
+-+-----+-----+-----+--------------+--------------------+

改进并不是那么令人印象深刻,但如果您有一个小的集群和大量的数据,仍然值得一试。

我认为这里的主要信息是分区很重要。 在某些情况下,它会自动处理(mllibsql),但如果您使用低级操作,则需要自行负责。


zero323,它完美地解决了我的问题,并提高了我对Spark的理解 - 非常感谢!!! - sashaostr
另外一个问题是,为什么要将分区数量从init = sc.parallelize(xrange(10000), ncores)加倍到init2 = init.map(lambda n: (n, n*3)).partitionBy(ncores * 2).cache() init3 = init.map(lambda n: (n, n*2)).partitionBy(ncores * 2).cache() init4 = init2.join(init3, ncores * 2).cache() - sashaostr
没有特别的原因,这只是我本地机器上的默认值。一般来说,你应该使用可用核心数量的两倍作为分区数的规则,但这取决于其他因素,如数据量、IO吞吐量等。 - zero323
@zero323,我在这个问题中遇到了与图形对象类似的问题,rdd的解决方案似乎不能用于图形顶点rdd。 - bourneli
当我使用flatMap对RDD进行操作时,我遇到了同样的问题,即每个条目产生两个项目。这使得分区数量翻倍。在我的情况下,紧接着是与另一个RDD进行union和groupByKey操作。我通过将groupByKey中的分区数设置为flatMap之前的原始分区数来解决了这个问题:groupByKey(orig_num_partitions) - Mack
不错的回答!但是对于数据框而言会发生什么情况,如果使用iterativeUnions而非Joins呢? - drkostas

8
问题在于(正如zero323在他详细的回答中所指出的),调用join而没有指定分区数可能会导致分区数量增加。分区数量可以无限制地增长(显然)。当反复调用join时,有(至少)两种方法可以防止分区数量无限增长。
方法1:
正如zero323所指出的,当调用join时,您可以手动指定分区数。例如:
rdd1.join(rdd2, numPartitions)

这将确保分区数不会超过numPartitions,特别是分区数不会不断增加。
方法2:
当您创建SparkConf时,可以指定默认的并行级别。如果设置了此值,则在调用像join这样的函数时,如果没有指定numPartitions,则将使用默认并行度,从而有效地限制分区数量并防止其增长。您可以设置此参数为:
conf=SparkConf.set("spark.default.parallelism", numPartitions)
sc = SparkContex(conf=conf)   

-2

RDDs是不可变的。尝试使用rdd = rdd.cache()进行缓存。


这里没有问题:rdd = sc.parallelize([]); rdd.is_cached; rdd.cache(); rdd.is_cachedcache() 实际上修改了 is_cached 字段。 - zero323

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接