我遇到了一个奇怪的问题。据我所知,Spark中操作的DAG仅在执行动作时才会执行。然而,我发现reduceByKey()(转换操作)开始执行DAG。
复现步骤。尝试使用以下代码:
SparkConf conf =new SparkConf().setMaster("local").setAppName("Test");
JavaSparkContext context=new JavaSparkContext(conf);
JavaRDD<String> textFile = context.textFile("any non-existing path"); // This path should not exist
JavaRDD<String> flatMap = textFile.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(x -> new Tuple2<String, Integer>((String) x, 1));
注意:文件的路径不应该是任何已存在的路径。换句话说,文件不应该存在。
如果您执行此代码,期望什么也不会发生。但是,如果您将以下行添加到程序并执行:
mapToPair.reduceByKey((x, y) -> x + y);
它会抛出以下异常:
Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
这意味着 DAG 已经开始执行。由于 reduceByKey() 是一种转换操作,只有在执行诸如 collect() 或 take() 之类的动作时才会发生。
Spark 版本为 2.0.0,请提供您的建议。