如何在Spark中强制DataFrame求值

27
有时(例如测试和基准测试)我想强制执行在数据框上定义的转换。据我所知,调用像count这样的操作并不能保证所有Columns都被实际计算,show可能只计算了所有Rows的子集(见下面的示例)。
我的解决方案是使用df.write.saveAsTableDataFrame写入HDFS,但这会在我的系统中“杂乱无章”地生成我不想再保留的表。
那么触发DataFrame评估的最佳方法是什么?
请注意,spark开发人员列表上也有关于count是否会始终触发每个行的评估的最近讨论:http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-row-td21018.html 我做了一个小的例子,展示了DataFrame上的count并不会评估所有内容(使用Spark 1.6.3和spark-master=local[2]进行测试)。
val df = sc.parallelize(Seq(1)).toDF("id")
val myUDF = udf((i:Int) => {throw new RuntimeException;i})

df.withColumn("test",myUDF($"id")).count // runs fine
df.withColumn("test",myUDF($"id")).show() // gives Exception

使用相同的逻辑,这里是一个示例,show 不会评估所有行:

val df = sc.parallelize(1 to 10).toDF("id")
val myUDF = udf((i:Int) => {if(i==10) throw new RuntimeException;i})

df.withColumn("test",myUDF($"id")).show(5) // runs fine
df.withColumn("test",myUDF($"id")).show(10) // gives Exception

编辑2:对于Eliasah:异常信息如下:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost): java.lang.RuntimeException
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
.
.
.
.

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1376)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1457)
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
.
.
.
.

1
我的回答会帮助你,因为你的假设是错误的。https://dev59.com/al0Z5IYBdhLWcg3wfge-#31384084 - eliasah
2
根据这个讨论(http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-row-td21018.html,参见Matei Zaharia的帖子),在数据框上使用count似乎不会评估所有列。但如果这不是真的,我会很高兴。 - Raphael Roth
对不起,我不相信他所说的也是正确的。 - eliasah
让我们在聊天中继续这个讨论 - eliasah
你提供的讨论链接非常有趣。 - eliasah
显示剩余6条评论
4个回答

24
有点晚了,但这是最根本的原因:countRDDDataFrame 上的行为不同。
DataFrame 中有一个优化,当你不需要加载数据实际上就可以知道元素数量时(特别是在没有数据混洗涉及的情况下),会使用这个优化。因此当调用 count 时,将不会加载任何数据并且不会抛出异常。你可以通过定义自己的 DefaultSourceRelation 进行实验,可以看到无论选择了多少列,在 DataFrame 上调用 count 将总是最终在方法 buildScan 中结束,并且没有需要的列(参考 org.apache.spark.sql.sources.interfaces 以了解更多)。它实际上是非常高效的优化 ;-)
然而,在 RDD 中却没有这样的优化(这就是为什么应该尽可能使用 DataFrame 的原因)。因此,在 RDD 上调用 count 会执行所有的血统,并返回组成任何分区的迭代器的大小之和。
调用 dataframe.count 进入第一个解释,但是调用 dataframe.rdd.count 则进入第二个解释,因为你已经从 DataFrame 构建了一个 RDD。请注意,调用 dataframe.cache().count 强制 dataframe 被实例化,因为你要求 Spark 缓存结果(因此需要加载所有数据并对其进行转换)。但是它会有缓存数据的副作用...

4
“缓存”不是建议将数据放入内存以提高性能,而是强制性要求这样做的吗? - user238607

20
我猜从DataFrame中获取底层的rdd,然后对其触发一个操作应该就能达到你所需的效果。
df.withColumn("test",myUDF($"id")).rdd.count // this gives proper exceptions

12
这很奇怪。看起来这个答案对原问题有用,但是有些人仍然在没有提及任何理由或反馈的情况下给了几个负评。 :) - Sachin Tyagi

4

看起来 df.cache.count 是最好的选择:

scala> val myUDF = udf((i:Int) => {if(i==1000) throw new RuntimeException;i})
myUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(IntegerType)))

scala> val df = sc.parallelize(1 to 1000).toDF("id")
df: org.apache.spark.sql.DataFrame = [id: int]

scala> df.withColumn("test",myUDF($"id")).show(10)
[rdd_51_0]
+---+----+
| id|test|
+---+----+
|  1|   1|
|  2|   2|
|  3|   3|
|  4|   4|
|  5|   5|
|  6|   6|
|  7|   7|
|  8|   8|
|  9|   9|
| 10|  10|
+---+----+
only showing top 10 rows

scala> df.withColumn("test",myUDF($"id")).count
res13: Long = 1000

scala> df.withColumn("test",myUDF($"id")).cache.count
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (int) => int)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
.
.
.
Caused by: java.lang.RuntimeException

Source


2
虽然这似乎有效,但它有一个副作用(即缓存)。 - Raphael Roth
1
Spark并非总是需要执行您打算执行的所有操作才能进行计数,有时这种方法也可以起作用。 - Dan Ciborowski - MSFT

3

我更喜欢使用df.save.parquet()。这会增加磁盘读写时间,但您可以稍后估算并将其减去,但是您可以确信Spark执行了您预期的每个步骤,并没有通过惰性评估欺骗您。


我该如何估计磁盘I/O时间,丹?此外,collect函数会起作用吗?还是它也会增加一些开销时间? - Anmol Deep
1
我假设你有一个名为“df”的数据框,并且正在对其执行“X”个操作。 要获取基准时间,只需执行df.save.parquet(),并计时。然后在df上执行你的方法,并执行df_prime.save.parquet()。第一个操作将让你知道仅保存df需要多长时间。 - Dan Ciborowski - MSFT

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接