Spark读取大文件

10
这可能是一个愚蠢的问题。我想确保我正确理解了这一点。 当你将一个巨大的文件(400GB)加载到一个集群中,其中集体执行器内存只有约120GB时,Spark似乎会一直读取。它不会崩溃,也不会启动第一个映射任务。
我认为正在发生的事情是,Spark正在以流的形式阅读大型文件,并在执行器耗尽内存时开始丢弃旧行。当.map代码开始执行时,这显然可能会成为一个问题,因为执行器jvm将再次从文件开头读回文件。但我想知道的是,Spark是否以类似于洗牌溢出机制的方式将数据溢出到硬盘上。
请注意,我没有提到缓存过程。这与使用sc.textFile(filename)进行初始读取有关。
1个回答

13

sc.textFile并不开始任何读取操作,它只是定义了一个驱动器驻留的数据结构,可以用于进一步处理。

只有在RDD上调用操作时,Spark才会构建执行所有必需变换(包括读取)的策略,然后返回结果。

如果运行序列时调用了一个操作,并且在读取后的下一个转换是映射,则Spark将需要读取文件的一小部分行(根据基于核心数的分区策略),然后立即开始映射,直到需要向驱动程序返回结果或在下一个转换序列之前进行洗牌。

如果您的分区策略(defaultMinPartitions)似乎使工作节点超载,因为您的分区(java表示)(在HDFS术语中称为InputSplit)比可用的执行器内存大,则需要指定要读取的分区数作为textFile的第二个参数。您可以通过将文件大小除以目标分区大小(考虑到内存增长)来计算理想的分区数。检查文件是否可以读取的简单方法是:

sc.textFile(file, numPartitions)
  .count()  

还有,请查看这个问题:如何在Spark中处理大规模数据的reduceByKey操作


我会强调检查是否实际调用了某个操作。此外,可能需要提到可以使用不同的持久化设置,如此处所示 - Mikel Urkia
我可以告诉你们,你们都走错了方向。在所有代码的最后,有一个.saveTextFile,它充当所需的操作。今天我重写了代码,用普通的RDD替换了DataFrame,在相同大小的数据和集群下一切正常。现在我想,这一切都与我编写的DataFrame代码有关。 - bhomass
真的希望有人能回答这个问题。我能找到Spark中缓存如何工作的相关信息,但找不到任何文件说明当初始读取超出总内存时会发生什么。有人知道吗? - bhomass
1
修改了我的回答,谈论了调整InputSplit大小以适应执行器内存的问题。当你接受免费帮助时,可以更加礼貌一些。 - Alister Lee
感谢您编辑后的回复。我并不是要无礼,只是不想将一个未解决问题标记为答案。新的分区大小回复更加针对原始问题。 - bhomass

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接