从Spark RDD收集大数据集的最佳实践是什么?

4

我正在使用pyspark处理数据,最后需要使用rdd.collect()从rdd中收集数据。然而,由于内存问题,我的spark崩溃了。我尝试了很多方法,但都没有成功。现在我正在运行以下代码,为每个分区处理一小块数据:

def make_part_filter(index):
    def part_filter(split_index, iterator):
        if split_index == index:
            for el in iterator:
                yield el
    return part_filter


for part_id in range(rdd.getNumPartitions()):
    part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
    myCollection = part_rdd.collect()
    for row in myCollection:
          #Do something with each row

我目前使用的新代码不会崩溃,但似乎一直在运行。

有没有更好的方法从大型RDD中收集数据?


1
为什么不使用map函数而不是在RDD的列表格式上运行for循环? - Saif Charaniya
实际上,我需要收集RDD中的所有数据并存储在一个大数组中,然后将其提供给机器学习模块。 - JamesLi
1
机器学习模块是否接受迭代器,还是只接受数组?使用迭代器可以避免一次性将所有数据加载到内存中。即便如此,我仍然担心性能问题,因为我假设机器学习模块将使用单个线程“消耗”数据。 - E.F.Walker
我们在谈论哪种机器学习算法?Spark的理念是以分布式方式运行它。 - z-star
我真的很惊讶这个功能没有内置,或者至少没有更多地询问。我有一个巨大的RDD,我想将其附加到磁盘上的文件中,但我的主节点无法将整个RDD装入内存! - sudo
2个回答

4

我不知道这是否是最好的方法,但这是我尝试过的最好的方法。不确定它比你的更好还是更差。同样的想法,将其分成块,但您可以更灵活地选择块大小。

def rdd_iterate(rdd, chunk_size=1000000):
    indexed_rows = rdd.zipWithIndex().cache()
    count = indexed_rows.count()
    print("Will iterate through RDD of count {}".format(count))
    start = 0
    end = start + chunk_size
    while start < count:
        print("Grabbing new chunk: start = {}, end = {}".format(start, end))
        chunk = indexed_rows.filter(lambda r: r[1] >= start and r[1] < end).collect()
        for row in chunk:
            yield row[0]
        start = end
        end = start + chunk_size

举个例子,我想将一个巨大的RDD追加到磁盘上的CSV文件中,而不必将整个RDD填充到Python列表中:

def rdd_to_csv(fname, rdd):
    import csv
    f = open(fname, "a")
    c = csv.writer(f)
    for row in rdd_iterate(rdd): # with abstraction, iterates through entire RDD
        c.writerows([row])
    f.close()

rdd_to_csv("~/test.csv", my_really_big_rdd)

1
我正在尝试解决一个类似的问题,而且我发现你的代码很有帮助!谢谢! - Nahuel Chaves
不用谢。我记得在以前的工作中使用过那段代码,它非常可靠。 - sudo

2
尝试“收集”一个巨大的RDD是有问题的。 “Collect”返回一个列表,这意味着整个RDD内容必须存储在驱动程序的内存中。 这是一个“致命”的问题。 通常,人们希望Spark应用程序能够处理数据集,其大小远远超出单个节点内存的容量。
假设RDD刚好适合内存,并且“collect”有效。 那么我们又有了另一个“致命”的问题-低性能。 在您的代码中,收集的RDD在循环中进行处理:“for row in myCollection”。 这个循环由一个核心执行。 因此,与通过RDD处理数据(其计算分布在群集的所有核心之间,可能有数百个甚至数千个)相比,所有整个数据集的工作都被放置在单个核心的背上。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接