Spark UI DAG阶段断开连接

6
我在spark-shell中运行了以下作业:
val d = sc.parallelize(0 until 1000000).map(i => (i%100000, i)).persist
d.join(d.reduceByKey(_ + _)).collect

Spark UI 显示三个阶段。第四和第五阶段对应于计算 d,第六阶段对应于计算 collect 操作。由于 d 已经被持续化,我期望只有两个阶段。然而,第五阶段存在但与其他阶段无关。

Spark UI DAG

尝试运行相同的计算,但不使用persist,DAG看起来完全一样,只是没有绿色点表示RDD已被持久化。

Spark UI DAG without persist

我希望第11阶段的输出连接到第12阶段的输入,但实际上并没有连接。
通过查看阶段描述,这些阶段似乎表明正在持久化d,因为第5阶段有输入,但我仍然不明白为什么第5阶段存在。

Spark UI stages

Spark UI stages without persist

2个回答

1
  1. Input RDD is cached and cached part is not recomputed.

    This can be validated with a simple test:

    import org.apache.spark.SparkContext
    
    def f(sc: SparkContext) = {
      val counter = sc.longAccumulator("counter")
      val rdd = sc.parallelize(0 until 100).map(i => {
        counter.add(1L)
        (i%10, i)
      }).persist
      rdd.join(rdd.reduceByKey(_ + _)).foreach(_ => ())
      counter.value
    }
    
    assert(f(spark.sparkContext) == 100)
    
  2. Caching doesn't remove stages from DAG.

    If data is cached corresponding stages can be marked as skipped but are still part of the DAG. Lineage can be truncated using checkpoints but it is not the same thing and it doesn't remove stages from visualization.

  3. Input stages contain more than cached computations.

    Spark stages group together operations which can be chained without performing shuffle.

    While part of the input stage is cached it doesn't cover all the operations required to prepare shuffle files. This is why you don't see skipped tasks.

  4. The rest (detachment) is just a limitation of the graph visualization.

  5. If you repartition data first:

    import org.apache.spark.HashPartitioner
    
    val d = sc.parallelize(0 until 1000000)
      .map(i => (i%100000, i))
      .partitionBy(new HashPartitioner(20))
    
    d.join(d.reduceByKey(_ + _)).collect
    

    you'll get DAG you're most likely looking for:

    enter image description here


0

补充user6910411的详细答案,RDD在首次运行并计算整个DAG之前不会被持久化到内存中,这是由于RDD的惰性评估。因此,当您第一次运行collect()时,RDD“d”将首次持久化到内存中,但不会从内存中读取任何内容。如果您第二次运行collect(),则会读取缓存的RDD。

此外,如果您对最终RDD执行toDebugString操作,则会显示以下输出:

    scala> d.join(d.reduceByKey(_ + _)).toDebugString
res5: String =
(4) MapPartitionsRDD[19] at join at <console>:27 []
 |  MapPartitionsRDD[18] at join at <console>:27 []
 |  CoGroupedRDD[17] at join at <console>:27 []
 +-(4) MapPartitionsRDD[15] at map at <console>:24 []
 |  |  ParallelCollectionRDD[14] at parallelize at <console>:24 []
 |  ShuffledRDD[16] at reduceByKey at <console>:27 []
 +-(4) MapPartitionsRDD[15] at map at <console>:24 []
    |  ParallelCollectionRDD[14] at parallelize at <console>:24 []

一个粗略的图形表示如下:RDD 阶段

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接