我正在使用pyspark处理数据,最后需要使用rdd.collect()从rdd中收集数据。然而,由于内存问题,我的spark崩溃了。我尝试了很多方法,但都没有成功。现在我正在运行以下代码,为每个分区处理一小块数据:
def make_part_filter(index):
def part_filter(split_index, iterator):
if split_index == index:
for el in iterator:
yield el
return part_filter
for part_id in range(rdd.getNumPartitions()):
part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
myCollection = part_rdd.collect()
for row in myCollection:
#Do something with each row
我目前使用的新代码不会崩溃,但似乎一直在运行。
有没有更好的方法从大型RDD中收集数据?