Apache Spark:为什么reduceByKey转化执行DAG?

4

我遇到了一个奇怪的问题。据我所知,Spark中操作的DAG仅在执行动作时才会执行。然而,我发现reduceByKey()(转换操作)开始执行DAG。

复现步骤。尝试使用以下代码:

SparkConf conf =new SparkConf().setMaster("local").setAppName("Test");
JavaSparkContext context=new JavaSparkContext(conf);

JavaRDD<String> textFile = context.textFile("any non-existing path"); // This path should not exist

JavaRDD<String> flatMap = textFile.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(x -> new Tuple2<String, Integer>((String) x, 1));

注意:文件的路径不应该是任何已存在的路径。换句话说,文件不应该存在。

如果您执行此代码,期望什么也不会发生。但是,如果您将以下行添加到程序并执行:

mapToPair.reduceByKey((x, y) -> x + y);

它会抛出以下异常:
Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:

这意味着 DAG 已经开始执行。由于 reduceByKey() 是一种转换操作,只有在执行诸如 collect() 或 take() 之类的动作时才会发生。

Spark 版本为 2.0.0,请提供您的建议。


这行代码是什么意思?JavaRDD<String> textFile = context.textFile("任何不存在的路径"); // 这个路径不应该存在 - Aviral Kumar
context.textFile() 理想情况下会将 HDFS 或本地内容加载到 RDD 中。如果路径不存在,RDD 将如何形成? - Aviral Kumar
@AviralKumar 这个问题涉及到为什么代码会被执行,考虑到转换是惰性评估的。文件不存在并且在调用reduceByKey之后抛出异常表明,在转换之后某些东西正在被执行。 - ImDarrenG
@ImDarrenG:谢谢你的解释。 - Sourav Gulati
@AviralKumar:我之所以特别要求提供不存在的路径来解释DAG正在运行,即使它抛出异常,是因为我想说明转换仍在进行中。 - Sourav Gulati
@ImDarrenG,我理解错了,抱歉。不,洗牌本身并不执行DAG。它只是添加了额外的阶段。对于我的错误表示抱歉,并感谢您的指出 ;) - T. Gawęda
1个回答

2

这是因为实际上并不会执行整个DAG(有向无环图)。

发生的事情是reduceByKey需要一个Partitioner才能工作。如果您没有提供,则Spark将根据惯例和默认值创建一个。代码中的“默认分区器”如下所示:

/**
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
* If any of the RDDs already has a partitioner, choose that one.
*
* Otherwise, we use a default HashPartitioner. For the number of partitions, if
* spark.default.parallelism is set, then we'll use the value from SparkContext
* defaultParallelism, otherwise we'll use the max number of upstream partitions.
*
* Unless spark.default.parallelism is set, the number of partitions will be the
* same as the number of partitions in the largest upstream RDD, as this should
* be least likely to cause out-of-memory errors.
*
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/

这个定义意味着,在某些情况下,计算所有上游RDD的分区数量。在您的情况下,这意味着要求“文件系统”(可能是Hadoop,也可能是本地文件系统等)执行必要的操作(例如,对Hadoop FileSystem的单个调用可以返回多个文件,每个文件还可以根据其InputFormat定义的各种优化进行拆分,只有实际查找才能知道)。
因此,这里执行的是这个操作,而不是实际的DAG(例如,您的map/flatMap/aggregate等)。
您可以通过在此reduceByKey变体中提供自己的分区器来避免这种情况:
 reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

感谢您的回答。我尝试提供自定义分区器,现在它不会抛出错误了。但是,我仍然困惑为什么它会开始计算分区,即使没有执行任何操作。以下是参考的分区器代码: `Partitioner partitioner = new Partitioner() { final int maxPartitions=2; @Override public int numPartitions() {return maxPartitions; } @Override public int getPartition(final Object obj) { String obj1 =(String) obj; return obj1.hashCode() % maxPartitions; } }; ` - Sourav Gulati
因为分区器不是在“操作执行”时“即兴创建”的,而是在需要它们的转换创建时创建的(这更像是Spark的一种权衡而不是缺陷)。默认的分区器会尝试聪明地处理它(例如,如果它创建了少量分区,则可能导致内存不足;如果它创建了太多分区,则可能是浪费),但是聪明意味着获取上游RDD描述,您称之为“计算分区”。设置默认并行度参数将关闭该功能,如果这确实是您想要的。 - GPI
你手工制作的Partitionner看起来像Spark已经拥有的HashPartitionner:https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/HashPartitioner.html - GPI
谢谢你解决我的疑问。我只是想尝试相同的哈希分区逻辑。 - Sourav Gulati

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接