在Spark中处理大量数据时如何使用reduceByKey?

8

我正在使用Spark运行reduceByKey。我的程序是Spark最简单的示例:

val counts = textFile.flatMap(line => line.split(" ")).repartition(20000).
                 .map(word => (word, 1))
                 .reduceByKey(_ + _, 10000)
counts.saveAsTextFile("hdfs://...")

但它总是会耗尽内存...

我正在使用50台服务器,每台服务器有35个执行器,每台服务器有140GB的内存。

文档的数量如下: 8TB的文档,200亿个文档,总共1000亿个字。 在减少后,大约将有1亿个单词。

我想知道如何设置spark的配置?

我想知道这些参数应该设置为什么值?

1. the number of the maps ? 20000 for example?
2. the number of the reduces ? 10000 for example?
3. others parameters?

2
你解决了吗?对于这么多数据的答案,知道了会很好。 - rubenafo
2个回答

5
如果您能发布日志将会很有帮助,但是另一个选项是在读取初始文本文件时指定更多的分区(例如sc.textFile(path, 200000)),而不是在读取后重新分区。另一个重要的问题是确保您的输入文件是可分割的(某些压缩选项使其无法分割,在这种情况下,Spark可能必须在单个机器上进行读取,从而导致OOMs)。
一些其他选项是,既然您没有缓存任何数据,那么可以减少Spark为缓存设置的内存量(由spark.storage.memoryFraction控制),另外,由于您只处理字符串元组,我建议使用org.apache.spark.serializer.KryoSerializer序列化程序。

1
你提到了“确保输入文件可分割”。那么我该如何确定我的S3上的.dat文件是否可分割呢?如果这不是正确的做法,我可以在论坛中发表另一个问题。 - sve
1
在这里发布了一个问题.gz vs flatfile - sve

0
你是否尝试使用partioner?它可以帮助减少每个节点的键数,如果我们假设平均1ko的键字权重,则每个节点独占100Go的内存用于键。通过分区,您可以大约将每个节点的键数减少到节点数,从而相应地减少每个节点所需的内存量。 @Holden提到的spark.storage.memoryFraction选项也是一个关键因素。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接