在循环内使用sparkDF.write.saveAsTable()会导致作业之间的延迟呈指数级增长。

4
我需要在一个for循环中执行一组不同的Hive查询。
hc=HiveContext(sc)
queryList=[set of queries]
for i in range(0,X):
    hc.sql(queryList[i])
    sparkDF.write.saveAsTable('hiveTable', mode='append')

尽管这段代码在较小的X值上运行得像魔法一样,但当X>100时会出现问题。每次saveAsTable作业之间的延迟呈指数增长,但每个作业大致需要5秒钟的常数时间。
以下是我尝试过的解决方法,但都没有成功:
  1. 在for循环中添加一次gc.collect() (i%100==0)。但这会打破for循环。
  2. 关闭当前的Spark和Hive context (i%100==0),并创建新的context - 仍然无法解决问题。
  3. 使用yarn-cluster而不是yarn-client - 没有运气!
是否有类似的选项,例如我每次调用saveAsTable函数时都创建一个与hive的连接,并关闭它?或者清理driver?
1个回答

1
这是因为您使用了for循环,该循环在Spark驱动程序模式下执行,而不会分布在集群工作节点上,也就是说它没有利用并行性的优势或者没有在工作节点上执行。尝试使用带有分区的parallelize创建RDD,这将有助于在工作节点上生成作业。
或者,如果您只想处理hiveContext,可以创建全局HiveContext,例如:val hiveCtx = new HiveContext(sc),并在循环中重复使用。
您还可以在运行集群作业时更改/优化执行器的数量,以提高性能。

这份工作肯定是分布式的,因为我可以看到所有我请求的执行器上都在执行saveastable任务。此外,由于我查询的表被缓存在内存中,我可以看到所有数据部分都是进程本地的。顺便说一下,我已经编辑了代码,以澄清关于hivecontext重复使用的问题。实际上,运行单个查询-保存所需的时间平均只有5秒钟。问题在于连续作业之间的延迟。 - Mike
在当前情况下,每个保存作业都可以分布式处理,但不能迭代查询...查询的迭代列表仍然按顺序进行,这就是为什么您感觉问题出现在连续作业之间延迟的原因...尝试在查询循环上并行化。 - Nitin
我对作业按顺序执行没有问题。问题在于每次迭代之间的延迟增加。 - Mike

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接