Spark处理gzip格式的大型文本文件

4
我正在运行一个Spark作业,它花费了太长的时间来处理输入文件。输入文件以Gzip格式为6.8GB,包含110M行文本。我知道它是Gzip格式,因此它不可分割,只有一个执行器将用于读取该文件。
作为我的调试过程的一部分,我决定查看将那个gzip文件转换为parquet需要多长时间。我的想法是,一旦我将其转换为parquet文件,然后如果我在该文件上运行我的原始Spark作业,那么它将使用多个执行器并且输入文件将被并行处理。
但即使是小的作业也比我预期的时间长。这是我的代码:
val input = sqlContext.read.text("input.gz")
input.write.parquet("s3n://temp-output/")

当我在我的笔记本电脑(16 GB RAM)上提取该文件时,只需要不到2分钟的时间。当我在Spark集群上运行它时,我的期望是它将花费相同甚至更少的时间,因为我正在使用的执行器内存为58 GB。但是它花了约20分钟。
我错过了什么?如果听起来很业余,我很抱歉,但我在Spark方面还是新手。
在gzip文件上运行Spark作业的最佳方法是什么?假设我没有创建其他文件格式(bzip2、snappy、lzo)的选项。

你好,你说parquet文件处理作业(从gzip到parquet花费了20分钟),是在driver上执行还是作业提交到集群上执行的?你可以通过查看该特定作业的spark-ui来检查并告诉我。如果它在集群上运行,它将显示集群上的多个节点。 - Ram Ghadiyaram
不,它是在集群上提交的。 - dreamer
1个回答

7
当进行输入-处理-输出类型的Spark作业时,需要考虑三个单独的问题:
1. 输入并行性 2. 处理并行性 3. 输出并行性
在你的情况下,由于你声称无法更改输入格式或粒度,因此输入并行性为1。
你也基本上没有进行任何处理,因此在那里无法获得任何收益。
但是,你可以控制输出并行性,这将给你带来两个好处:
1. 多个CPU将会写入,从而减少写操作的总时间。 2. 你的输出将被拆分成多个文件,允许你在后续处理中利用输入并行性。
要增加并行性,必须增加分区数,可以使用repartition()实现,例如:
val numPartitions = ...
input.repartition(numPartitions).write.parquet("s3n://temp-output/")

选择最佳分区数时,需要考虑许多不同的因素。
- 数据大小 - 分区偏斜 - 集群 RAM 大小 - 集群中的核心数 - 后续处理的类型 - 用于后续处理的集群(RAM 和核心)的大小 - 正在写入的系统
如果不知道您的目标和限制,很难给出确定的建议,但是以下是一些通用指南:
- 如果您使用的节点具有足够的 I/O,则设置分区数等于执行器核心数将获得最快的吞吐量,因为您的分区不会出现偏斜。 - 在处理数据时,确保整个分区能够“适合”单个执行器核心分配的 RAM。这里的“适合”取决于您的处理方式。如果您正在进行简单的map转换,则数据可以流式传输。如果涉及排序,则 RAM 需求会大大增加。如果您使用 Spark 1.6+,则可以获得更灵活的内存管理。如果使用早期版本,则需要更加小心。当 Spark 开始向磁盘“缓冲”时,作业执行会停滞不前。磁盘大小和 RAM 大小可能非常不同。后者基于您处理数据的方式以及 Spark 可以从谓词下推(Parquet 支持该功能)中获得多少好处而变化。使用 Spark UI 查看各个作业阶段所需的 RAM。 - 除非您的数据具有非常特定的结构,否则不要硬编码分区数,因为这样会导致代码在不同大小的集群上运行效率低下。相反,可以使用以下技巧来确定集群中执行器的数量。然后,根据您使用的机器将其乘以每个执行器的核心数。
// -1 is for the driver node
val numExecutors = sparkContext.getExecutorStorageStatus.length - 1

仅供参考,在我们团队中,我们使用相当复杂的数据结构,这意味着RAM大小远大于磁盘大小,我们的目标是将S3对象保持在50-250Mb范围内,以便在每个执行器核心具有10-20Gb RAM的节点上进行处理。

希望这可以帮到你。


感谢@Sim提供的详细信息,非常有帮助。现在我正在尝试根据您的评论使用不同的设置。仅供参考:我使用了3台r3.4xlarge机器作为EMR集群的核心。每个节点有16个vCPU和122 GB内存。Spark版本为1.6.1。我遵循了这篇文章:http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/来调整执行器/核心/内存数量,但没有得到预期的结果。在我尝试不同的设置时,您是否可以基于我刚刚分享的信息推荐其他设置?再次感谢。 - dreamer
你将如何处理持久化的数据? - Sim
我需要读取已持久化的数据(有1.1亿行字符串),并需要执行两个flapmap来创建一些值对,然后我需要执行reduceByKey按键对进行聚合。最后,我需要将数据映射到其他格式并计算一些其他值(联合,交集),最终将它们保存到Redshift中。它可以工作,但我正在尝试优化它。 - dreamer
重新分区帮助减少了时间,从23分钟缩短到了15/12分钟,其中读取花费了7.9分钟,写入花费了3.5分钟(60个分区),或者2分钟(使用120个分区)。 - dreamer
@dreamer 我很高兴这个有所帮助!根据你所描述的处理需求,你在分区方面有很大的灵活性。只需要保持分区数 >= 核心数。 - Sim

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接