在Spark中将DataFrame转换为RDD的成本

3
我正在尝试使用以下方法获取数据框的分区数量:
df.rdd.getNumPartitions.toString

但是当我监视Spark日志时,我发现它会启动许多阶段,并且是一个代价高昂的操作。 输入图像描述 根据我的理解,DataFrame通过元数据向RDD添加了一个结构层。那么,在转换为RDD时剥离它需要这么长时间吗?

1个回答

7

DataFrame是一种经过优化的分布式表格集合。由于它保持了表格格式(类似于SQL表),因此可以维护元数据,从而允许Spark在后台执行一些优化。

这些优化是由一些辅助项目(如CatalystTungsten)执行的。

RDD不维护任何模式,如果需要,您需要提供一个模式。因此,RDD不像DataFrame那样高度优化(Catalyst根本没有参与)。

将DataFrame转换为RDD会强制Spark循环遍历所有元素,将它们从高度优化的Catalyst空间转换为scala空间。

请查看来自.rdd的代码。

  lazy val rdd: RDD[T] = {
    val objectType = exprEnc.deserializer.dataType
    rddQueryExecution.toRdd.mapPartitions { rows =>
      rows.map(_.get(0, objectType).asInstanceOf[T])
    }
  }

@transient private lazy val rddQueryExecution: QueryExecution = {
    val deserialized = CatalystSerde.deserialize[T](logicalPlan)
    sparkSession.sessionState.executePlan(deserialized)
  }

首先,它执行计划并将输出作为RDD [InternalRow]检索出来,正如名称所示,仅供内部使用,需要转换为RDD [Row] 然后循环遍历所有行并将它们转换。如您所见,不仅仅是删除模式。
希望这回答了您的问题。

啊,好的。那么,你的意思是这个时间与数据框的庞大程度成正比吗? 另外,如果我仍想知道数据框的分区数,有没有办法避免这种代价? - Pritam Pathak
你可以尝试先缓存数据框,这样在将其转换为RDD之前就已经计算完成了。我认为这至少会有所帮助。 - SCouto

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接