为什么连接失败并显示“java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]”错误信息?

77

我正在使用Spark 1.5。

我有两个数据框的形式:

scala> libriFirstTable50Plus3DF
res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int]

scala> linkPersonItemLessThan500DF
res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int]

libriFirstTable50Plus3DF766,151条记录,而linkPersonItemLessThan500DF26,694,353条记录。请注意,我在linkPersonItemLessThan500DF上使用repartition(number),因为我打算稍后将它们连接起来。我接下来会执行以下代码:

val userTripletRankDF = linkPersonItemLessThan500DF
     .join(libriFirstTable50Plus3DF, Seq("family_id"))
     .take(20)
     .foreach(println(_))

我正在获取此输出的原因是:

16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200)
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:        at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:63)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
 at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190)
 at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
 at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
 at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
 at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
 at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
 at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
 at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315)
 at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378)
 at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:77)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:79)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:83)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:85)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87)
 at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89)
 at $iwC$$iwC$$iwC$$iwC.<init>(<console>:91)
 at $iwC$$iwC$$iwC.<init>(<console>:93)
 at $iwC$$iwC.<init>(<console>:95)
 at $iwC.<init>(<console>:97)
 at <init>(<console>:99)
 at .<init>(<console>:103)
 at .<clinit>(<console>)
 at .<init>(<console>:7)
 at .<clinit>(<console>)
 at $print(<console>)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
 at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
 at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
 at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
 at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
 at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我不明白问题出在哪里。是简单的增加等待时间吗?连接太密集了吗?需要更多的内存吗?洗牌过于频繁吗?有人能帮忙吗?


让我提一下我对一个非常相似问题的回答:https://dev59.com/AVkS5IYBdhLWcg3wCCbf#48449467 - mathieu
5个回答

141

这是因为Spark尝试执行广播哈希连接,而其中一个DataFrame非常大,因此发送它会消耗很多时间。

您可以:

  1. 设置更高的spark.sql.broadcastTimeout以增加超时时间- spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000)
  2. persist()两个DataFrames,然后Spark将使用Shuffle Join - 参考这里

PySpark

在PySpark中,您可以在构建Spark上下文时以以下方式设置配置:

spark = SparkSession
  .builder
  .appName("Your App")
  .config("spark.sql.broadcastTimeout", "36000")
  .getOrCreate()

40

只是为了给T. Gawęda非常简洁的回答添加一些代码上下文。


在你的Spark应用程序中,Spark SQL选择了一种广播哈希连接进行连接,因为“libriFirstTable50Plus3DF有766,151个记录”,这比所谓的广播阈值(默认为10MB)要少。
您可以使用spark.sql.autoBroadcastJoinThreshold配置属性来控制广播阈值。 spark.sql.autoBroadcastJoinThreshold 配置了在执行连接时将广播到所有工作节点的表的最大大小(以字节为单位)。通过将此值设置为-1,可以禁用广播。请注意,目前仅支持已运行命令ANALYZE TABLE COMPUTE STATISTICS noscan 的Hive Metastore表的统计信息。
您可以在堆栈跟踪中找到该特定类型的连接。

org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)

在Spark SQL中,BroadcastHashJoin物理运算符使用广播变量将较小的数据集分发到Spark执行器(而不是在每个任务中都复制一份)。
如果您使用explain来查看物理查询计划,您会注意到查询使用了BroadcastExchangeExec物理运算符。这是您可以看到广播较小表的基础机制(以及超时)的地方。
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
  ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
}

doExecuteBroadcast 是 Spark SQL 中每个物理运算符都遵循的 SparkPlan 合同的一部分,允许在需要时进行广播。BroadcastExchangeExec 恰好需要它。

timeout 参数是您要查找的内容。

private val timeout: Duration = {
  val timeoutValue = sqlContext.conf.broadcastTimeout
  if (timeoutValue < 0) {
    Duration.Inf
  } else {
    timeoutValue.seconds
  }
}

如您所见,您可以完全禁用它(使用负值),这将意味着无限期等待广播变量被传输到执行程序,或者使用sqlContext.conf.broadcastTimeout,该属性与spark.sql.broadcastTimeout完全相同。默认值为5 * 60秒,您可以在堆栈跟踪中看到:

java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]


2
超时发生可能有不同的原因。其中之一是缺乏在集群上运行执行器的资源。可以使用spark.scheduler.minRegisteredResourcesRatio和spark.scheduler.maxRegisteredResourcesWaitingTime使执行等待直到资源可用。 - Eugene
1
@Eugene 我发现自己处于这样一个情况中,我没有进行任何连接,并且在我的堆栈跟踪中找不到任何连接,因此我倾向于相信您的评论是正确的。我该如何找出资源短缺是我的问题? - frammnm
@frammnm,我对你的情况很好奇。你能否提出一个单独的问题并包含执行计划?谢谢! - Jacek Laskowski
1
@JacekLaskowski 你好,我一直在使用EventHubs,发现当Spark Streaming一段时间内没有向EventHub发送消息时,会出现关于futures超时的异常。我认为这与处理EventHub的库有一个超时异常有关。希望这能帮到你。 - frammnm
@JacekLaskowski 感谢您的回答!persist避免了重新计算整个血统,但为什么它有助于减少广播时间,以便我们不会出现“广播超时”? - jack

8

除了增加 spark.sql.broadcastTimeout 或将两个DataFrames都进行persist()之外,您还可以尝试:

1.通过将 spark.sql.autoBroadcastJoinThreshold 设置为 -1 来禁用广播。

2.通过将 spark.driver.memory 设置为更高的值来增加Spark驱动程序内存。


3
如果错误是由于超时引起的,为什么spark.driver.memory可以帮助解决问题? - jack
1
@jack广播意味着整个广播数据集必须在发送给工作进程之前在驱动程序中收集。如果您的驱动程序内存不足,那么情况可能会变得糟糕。 - Logister

0
当我在循环中使用breeze的leastSquares函数时,我遇到了这个错误。Spark认为这是长时间运行的任务,并抛出了超时异常。解决办法是将该任务移动到自己的分布式循环中。

0
在我的情况下,这是由于对一个大数据框进行广播引起的:
df.join(broadcast(largeDF))

所以,根据之前的答案,我通过移除广播来修复它:

df.join(largeDF)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接