从多线程驱动程序启动Apache Spark SQL作业

4
我想使用Spark从大约1500个远程Oracle表中提取数据,并希望具备多线程的应用程序,每个线程选择一个表格或者10个表格,并启动一个Spark任务以读取各自的表格。根据官方网站 https://spark.apache.org/docs/latest/job-scheduling.html ,这是可行的。Spark运行在集群管理器上,提供跨应用程序的调度设施。此外,在每个Spark应用程序中,如果由不同线程提交,则可以同时运行多个“作业”(Spark操作)。如果您的应用程序通过网络提供服务,则这很常见。Spark包括公平调度程序,以在每个SparkContext内安排资源。
然而,你可能会注意到在这篇SO文章 Concurrent job Execution in Spark 中,这个类似的问题没有被接受的答案,最受欢迎的答案以以下方式开始:
这实际上不符合Spark的精神
1. 每个人都知道这不符合Spark的“精神” 2. 谁关心Spark的精神?那实际上并没有什么意义
有人之前是否做过这样的事情?你需要做些特别的事情吗?在我浪费了很多工作时间进行原型设计之前,我只是想要一些指导。非常感谢你的帮助!

1
这不符合Spark的精神,因为拥有分布式工作池的整个目的是利用工作管理器在集群本身上安排作业,而不是在驱动节点上。为什么您需要每个sparkcontext多个joblets /任务,而不是多个驱动程序并使用正常抽象(驱动程序->工作程序->驱动程序->实际工作程序)?您是否由于某些原因被迫进入yarn客户端或独立模式? - cowbert
实际上,目前是的,我被迫使用yarn客户端模式。我正在使用Zeppelin做所有事情。但即使我不是这样做,我也宁愿只有一个构件(一个驱动程序),因为所有的作业都在做完全相同的事情。至少,这将是一个有趣的测试,看看公平调度程序是否能更好地处理作业调度,而不是由我来做。此外,正如Spark文档所说,这就是网络应用程序的运行方式,这并不奇怪。 - uh_big_mike_boi
1
我在这里描述了我使用的技术:https://dev59.com/Wafja4cB1Zd3GeqPvGRP#47733522 - Raphael Roth
3个回答

13

Spark Context是线程安全的,因此可以在多个线程中并行调用它。(我正在生产环境中这样做)

需要注意的一点是要限制运行的线程数量,因为:
1. 执行器内存在所有线程之间共享,您可能会遇到OOM错误或不断地从缓存中交换内存
2. CPU受限,因此任务数超过核心数不会有任何改进。


驱动程序不会与任务共享执行器。驱动程序是提交Spark作业的程序。这意味着提交作业的线程不会干扰代表任务的线程。此外,有更多的分区来处理比您拥有的任务更好,只需确保配置了执行器以不超过调度CPU和/或可用内存即可。更多但较小的分区将导致更好的任务分配,确保一个执行器突然不必处理比另一个执行器更长的时间。 - YoYo

6
您不需要在一个多线程应用程序中提交您的作业(虽然我认为您完全可以这样做)。只需将您的作业作为单独的进程提交。编写一个脚本,逐个提交所有这些作业并将进程推到后台,或以yarn-cluster模式提交。
您的调度程序(yarn、mesos、spark集群)将只让一些作业等待,因为它没有足够的空间让所有调度程序基于内存和/或CPU可用性同时运行。
请注意,如果您真正使用多个分区处理您的表格,我只看到您的方法有益处 - 而不仅仅是像我看到的许多次那样只有一个分区。另外,由于您需要处理那么多表格,我不确定您会获得多少 - 如果有任何 - 益处。根据您对表格数据的处理方式,可能更简单的方法是运行多个单线程和非Spark作业。
还请参阅@cowbert的注释。

3

我同意@lev的观点,我已经想了很长时间,所以我写了一个简单的小代码来确保它可以工作,请注意!!为了控制每个驱动程序的工人数量,您需要使用coalesce限制数据框/集合。

以下是示例代码:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object SparkMultiThreadExample extends App{

  val TOTAL_WORKERS = 10
  val NUMBER_OF_WORKERS_PER_DRIVER = 2
  val sparkConf = new SparkConf()
  sparkConf.setMaster(s"local[${TOTAL_WORKERS}]")
  val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  val list1 = (0 until 10).toList
  import spark.implicits._
  list1.par.foreach(t => {
    spark.createDataset(list1).coalesce(NUMBER_OF_WORKERS_PER_DRIVER).foreach(i =>   {
  println(s"${Thread.currentThread()}, Driver thread ${t}: This is inside worker ${i} " )
  Thread.sleep(1000)
  println(s"FINISH ${Thread.currentThread()} Driver thread ${t}: This is inside worker ${i} " )
})
}) }

输出:

Thread[Executor task launch worker for task 0,5,main], Driver thread 0: This is inside worker 0 
Thread[Executor task launch worker for task 4,5,main], Driver thread 3: This is inside worker 0 
Thread[Executor task launch worker for task 7,5,main], Driver thread 5: This is inside worker 5 
Thread[Executor task launch worker for task 1,5,main], Driver thread 0: This is inside worker 5 
Thread[Executor task launch worker for task 3,5,main], Driver thread 2: This is inside worker 5 
Thread[Executor task launch worker for task 6,5,main], Driver thread 5: This is inside worker 0 
Thread[Executor task launch worker for task 2,5,main], Driver thread 2: This is inside worker 0 
Thread[Executor task launch worker for task 5,5,main], Driver thread 3: This is inside worker 5 
Thread[Executor task launch worker for task 9,5,main], Driver thread 4: This is inside worker 5 
Thread[Executor task launch worker for task 8,5,main], Driver thread 4: This is inside worker 0 
FINISH Thread[Executor task launch worker for task 0,5,main] Driver thread 0: This is inside worker 0 
FINISH Thread[Executor task launch worker for task 7,5,main] Driver thread 5: This is inside worker 5 
FINISH Thread[Executor task launch worker for task 4,5,main] Driver thread 3: This is inside worker 0 
FINISH Thread[Executor task launch worker for task 3,5,main] Driver thread 2: This is inside worker 5 
FINISH Thread[Executor task launch worker for task 1,5,main] Driver thread 0: This is inside worker 5 
Thread[Executor task launch worker for task 3,5,main], Driver thread 2: This is inside worker 6 
Thread[Executor task launch worker for task 4,5,main], Driver thread 3: This is inside worker 1 
Thread[Executor task launch worker for task 1,5,main], Driver thread 0: This is inside worker 6 
Thread[Executor task launch worker for task 0,5,main], Driver thread 0: This is inside worker 1 
Thread[Executor task launch worker for task 7,5,main], Driver thread 5: This is inside worker 6 
FINISH Thread[Executor task launch worker for task 2,5,main] Driver thread 2: This is inside worker 0 
FINISH Thread[Executor task launch worker for task 5,5,main] Driver thread 3: This is inside worker 5 
Thread[Executor task launch worker for task 2,5,main], Driver thread 2: This is inside worker 1 
FINISH Thread[Executor task launch worker for task 9,5,main] Driver thread 4: This is inside worker 5 
FINISH Thread[Executor task launch worker for task 6,5,main] Driver thread 5: This is inside worker 0 
Thread[Executor task launch worker for task 9,5,main], Driver thread 4: This is inside worker 6 
Thread[Executor task launch worker for task 5,5,main], Driver thread 3: This is inside worker 6 
FINISH Thread[Executor task launch worker for task 8,5,main] Driver thread 4: This is inside worker 0 
Thread[Executor task launch worker for task 6,5,main], Driver thread 5: This is inside worker 1 
Thread[Executor task launch worker for task 8,5,main], Driver thread 4: This is inside worker 1 
FINISH Thread[Executor task launch worker for task 3,5,main] Driver thread 2: This is inside worker 6 
FINISH Thread[Executor task launch worker for task 4,5,main] Driver thread 3: This is inside worker 1 
FINISH Thread[Executor task launch worker for task 1,5,main] Driver thread 0: This is inside worker 6 
Thread[Executor task launch worker for task 4,5,main], Driver thread 3: This is inside worker 2 
Thread[Executor task launch worker for task 3,5,main], Driver thread 2: This is inside worker 7 
FINISH Thread[Executor task launch worker for task 0,5,main] Driver thread 0: This is inside worker 1 
Thread[Executor task launch worker for task 1,5,main], Driver thread 0: This is inside worker 7 
FINISH Thread[Executor task launch worker for task 7,5,main] Driver thread 5: This is inside worker 6 
Thread[Executor task launch worker for task 0,5,main], Driver thread 0: This is inside worker 2 
Thread[Executor task launch worker for task 7,5,main], Driver thread 5: This is inside worker 7 
FINISH Thread[Executor task launch worker for task 2,5,main] Driver thread 2: This is inside worker 1 
Thread[Executor task launch worker for task 2,5,main], Driver thread 2: This is inside worker 2 
FINISH Thread[Executor task launch worker for task 9,5,main] Driver thread 4: This is inside worker 6 
Thread[Executor task launch worker for task 9,5,main], Driver thread 4: This is inside worker 7 
FINISH Thread[Executor task launch worker for task 5,5,main] Driver thread 3: This is inside worker 6 
Thread[Executor task launch worker for task 5,5,main], Driver thread 3: This is inside worker 7 
FINISH Thread[Executor task launch worker for task 6,5,main] Driver thread 5: This is inside worker 1 
Thread[Executor task launch worker for task 6,5,main], Driver thread 5: This is inside worker 2 
FINISH Thread[Executor task launch worker for task 8,5,main] Driver thread 4: This is inside worker 1 
Thread[Executor task launch worker for task 8,5,main], Driver thread 4: This is inside worker 2 
FINISH Thread[Executor task launch worker for task 4,5,main] Driver thread 3: This is inside worker 2 
FINISH Thread[Executor task launch worker for task 7,5,main] Driver thread 5: This is inside worker 7 
FINISH Thread[Executor task launch worker for task 0,5,main] Driver thread 0: This is inside worker 2 
FINISH Thread[Executor task launch worker for task 1,5,main] Driver thread 0: This is inside worker 7 
FINISH Thread[Executor task launch worker for task 3,5,main] Driver thread 2: This is inside worker 7 
Thread[Executor task launch worker for task 7,5,main], Driver thread 5: This is inside worker 8 
Thread[Executor task launch worker for task 4,5,main], Driver thread 3: This is inside worker 3 
Thread[Executor task launch worker for task 3,5,main], Driver thread 2: This is inside worker 8 
Thread[Executor task launch worker for task 0,5,main], Driver thread 0: This is inside worker 3 
Thread[Executor task launch worker for task 1,5,main], Driver thread 0: This is inside worker 8 
FINISH Thread[Executor task launch worker for task 2,5,main] Driver thread 2: This is inside worker 2 
Thread[Executor task launch worker for task 2,5,main], Driver thread 2: This is inside worker 3 
FINISH Thread[Executor task launch worker for task 9,5,main] Driver thread 4: This is inside worker 7 
FINISH Thread[Executor task launch worker for task 5,5,main] Driver thread 3: This is inside worker 7 
Thread[Executor task launch worker for task 9,5,main], Driver thread 4: This is inside worker 8 
Thread[Executor task launch worker for task 5,5,main], Driver thread 3: This is inside worker 8 
FINISH Thread[Executor task launch worker for task 6,5,main] Driver thread 5: This is inside worker 2 
FINISH Thread[Executor task launch worker for task 8,5,main] Driver thread 4: This is inside worker 2 
Thread[Executor task launch worker for task 6,5,main], Driver thread 5: This is inside worker 3 
Thread[Executor task launch worker for task 8,5,main], Driver thread 4: This is inside worker 3 
FINISH Thread[Executor task launch worker for task 7,5,main] Driver thread 5: This is inside worker 8 
FINISH Thread[Executor task launch worker for task 4,5,main] Driver thread 3: This is inside worker 3 
FINISH Thread[Executor task launch worker for task 0,5,main] Driver thread 0: This is inside worker 3 
FINISH Thread[Executor task launch worker for task 3,5,main] Driver thread 2: This is inside worker 8 
Thread[Executor task launch worker for task 0,5,main], Driver thread 0: This is inside worker 4 
Thread[Executor task launch worker for task 3,5,main], Driver thread 2: This is inside worker 9 
Thread[Executor task launch worker for task 4,5,main], Driver thread 3: This is inside worker 4 
Thread[Executor task launch worker for task 7,5,main], Driver thread 5: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 1,5,main] Driver thread 0: This is inside worker 8 
Thread[Executor task launch worker for task 1,5,main], Driver thread 0: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 2,5,main] Driver thread 2: This is inside worker 3 
Thread[Executor task launch worker for task 2,5,main], Driver thread 2: This is inside worker 4 
FINISH Thread[Executor task launch worker for task 9,5,main] Driver thread 4: This is inside worker 8 
FINISH Thread[Executor task launch worker for task 5,5,main] Driver thread 3: This is inside worker 8 
Thread[Executor task launch worker for task 9,5,main], Driver thread 4: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 6,5,main] Driver thread 5: This is inside worker 3 
FINISH Thread[Executor task launch worker for task 8,5,main] Driver thread 4: This is inside worker 3 
Thread[Executor task launch worker for task 5,5,main], Driver thread 3: This is inside worker 9 
Thread[Executor task launch worker for task 8,5,main], Driver thread 4: This is inside worker 4 
Thread[Executor task launch worker for task 6,5,main], Driver thread 5: This is inside worker 4 
FINISH Thread[Executor task launch worker for task 0,5,main] Driver thread 0: This is inside worker 4 
FINISH Thread[Executor task launch worker for task 4,5,main] Driver thread 3: This is inside worker 4 
FINISH Thread[Executor task launch worker for task 3,5,main] Driver thread 2: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 7,5,main] Driver thread 5: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 1,5,main] Driver thread 0: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 2,5,main] Driver thread 2: This is inside worker 4 
FINISH Thread[Executor task launch worker for task 9,5,main] Driver thread 4: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 5,5,main] Driver thread 3: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 6,5,main] Driver thread 5: This is inside worker 4 
FINISH Thread[Executor task launch worker for task 8,5,main] Driver thread 4: This is inside worker 4 
Thread[Executor task launch worker for task 11,5,main], Driver thread 7: This is inside worker 5 
Thread[Executor task launch worker for task 10,5,main], Driver thread 7: This is inside worker 0 
Thread[Executor task launch worker for task 12,5,main], Driver thread 6: This is inside worker 0 
Thread[Executor task launch worker for task 13,5,main], Driver thread 6: This is inside worker 5 
Thread[Executor task launch worker for task 14,5,main], Driver thread 1: This is inside worker 0 
Thread[Executor task launch worker for task 15,5,main], Driver thread 1: This is inside worker 5 
Thread[Executor task launch worker for task 16,5,main], Driver thread 8: This is inside worker 0 
Thread[Executor task launch worker for task 17,5,main], Driver thread 8: This is inside worker 5 
Thread[Executor task launch worker for task 19,5,main], Driver thread 9: This is inside worker 5 
Thread[Executor task launch worker for task 18,5,main], Driver thread 9: This is inside worker 0 
FINISH Thread[Executor task launch worker for task 11,5,main] Driver thread 7: This is inside worker 5 
Thread[Executor task launch worker for task 11,5,main], Driver thread 7: This is inside worker 6 
FINISH Thread[Executor task launch worker for task 10,5,main] Driver thread 7: This is inside worker 0 
Thread[Executor task launch worker for task 10,5,main], Driver thread 7: This is inside worker 1 
FINISH Thread[Executor task launch worker for task 12,5,main] Driver thread 6: This is inside worker 0 
Thread[Executor task launch worker for task 12,5,main], Driver thread 6: This is inside worker 1 
FINISH Thread[Executor task launch worker for task 13,5,main] Driver thread 6: This is inside worker 5 
Thread[Executor task launch worker for task 13,5,main], Driver thread 6: This is inside worker 6 
FINISH Thread[Executor task launch worker for task 14,5,main] Driver thread 1: This is inside worker 0 
Thread[Executor task launch worker for task 14,5,main], Driver thread 1: This is inside worker 1 
FINISH Thread[Executor task launch worker for task 15,5,main] Driver thread 1: This is inside worker 5 
Thread[Executor task launch worker for task 15,5,main], Driver thread 1: This is inside worker 6 
FINISH Thread[Executor task launch worker for task 16,5,main] Driver thread 8: This is inside worker 0 
Thread[Executor task launch worker for task 16,5,main], Driver thread 8: This is inside worker 1 
FINISH Thread[Executor task launch worker for task 17,5,main] Driver thread 8: This is inside worker 5 
Thread[Executor task launch worker for task 17,5,main], Driver thread 8: This is inside worker 6 
FINISH Thread[Executor task launch worker for task 19,5,main] Driver thread 9: This is inside worker 5 
Thread[Executor task launch worker for task 19,5,main], Driver thread 9: This is inside worker 6 
FINISH Thread[Executor task launch worker for task 18,5,main] Driver thread 9: This is inside worker 0 
Thread[Executor task launch worker for task 18,5,main], Driver thread 9: This is inside worker 1 
FINISH Thread[Executor task launch worker for task 11,5,main] Driver thread 7: This is inside worker 6 
Thread[Executor task launch worker for task 11,5,main], Driver thread 7: This is inside worker 7 
FINISH Thread[Executor task launch worker for task 10,5,main] Driver thread 7: This is inside worker 1 
Thread[Executor task launch worker for task 10,5,main], Driver thread 7: This is inside worker 2 
FINISH Thread[Executor task launch worker for task 12,5,main] Driver thread 6: This is inside worker 1 
Thread[Executor task launch worker for task 12,5,main], Driver thread 6: This is inside worker 2 
FINISH Thread[Executor task launch worker for task 13,5,main] Driver thread 6: This is inside worker 6 
Thread[Executor task launch worker for task 13,5,main], Driver thread 6: This is inside worker 7 
FINISH Thread[Executor task launch worker for task 14,5,main] Driver thread 1: This is inside worker 1 
Thread[Executor task launch worker for task 14,5,main], Driver thread 1: This is inside worker 2 
FINISH Thread[Executor task launch worker for task 15,5,main] Driver thread 1: This is inside worker 6 
Thread[Executor task launch worker for task 15,5,main], Driver thread 1: This is inside worker 7 
FINISH Thread[Executor task launch worker for task 16,5,main] Driver thread 8: This is inside worker 1 
Thread[Executor task launch worker for task 16,5,main], Driver thread 8: This is inside worker 2 
FINISH Thread[Executor task launch worker for task 17,5,main] Driver thread 8: This is inside worker 6 
Thread[Executor task launch worker for task 17,5,main], Driver thread 8: This is inside worker 7 
FINISH Thread[Executor task launch worker for task 19,5,main] Driver thread 9: This is inside worker 6 
Thread[Executor task launch worker for task 19,5,main], Driver thread 9: This is inside worker 7 
FINISH Thread[Executor task launch worker for task 18,5,main] Driver thread 9: This is inside worker 1 
Thread[Executor task launch worker for task 18,5,main], Driver thread 9: This is inside worker 2 
FINISH Thread[Executor task launch worker for task 11,5,main] Driver thread 7: This is inside worker 7 
Thread[Executor task launch worker for task 11,5,main], Driver thread 7: This is inside worker 8 
FINISH Thread[Executor task launch worker for task 10,5,main] Driver thread 7: This is inside worker 2 
Thread[Executor task launch worker for task 10,5,main], Driver thread 7: This is inside worker 3 
FINISH Thread[Executor task launch worker for task 12,5,main] Driver thread 6: This is inside worker 2 
Thread[Executor task launch worker for task 12,5,main], Driver thread 6: This is inside worker 3 
FINISH Thread[Executor task launch worker for task 13,5,main] Driver thread 6: This is inside worker 7 
Thread[Executor task launch worker for task 13,5,main], Driver thread 6: This is inside worker 8 
FINISH Thread[Executor task launch worker for task 14,5,main] Driver thread 1: This is inside worker 2 
Thread[Executor task launch worker for task 14,5,main], Driver thread 1: This is inside worker 3 
FINISH Thread[Executor task launch worker for task 15,5,main] Driver thread 1: This is inside worker 7 
Thread[Executor task launch worker for task 15,5,main], Driver thread 1: This is inside worker 8 
FINISH Thread[Executor task launch worker for task 16,5,main] Driver thread 8: This is inside worker 2 
Thread[Executor task launch worker for task 16,5,main], Driver thread 8: This is inside worker 3 
FINISH Thread[Executor task launch worker for task 17,5,main] Driver thread 8: This is inside worker 7 
Thread[Executor task launch worker for task 17,5,main], Driver thread 8: This is inside worker 8 
FINISH Thread[Executor task launch worker for task 19,5,main] Driver thread 9: This is inside worker 7 
Thread[Executor task launch worker for task 19,5,main], Driver thread 9: This is inside worker 8 
FINISH Thread[Executor task launch worker for task 18,5,main] Driver thread 9: This is inside worker 2 
Thread[Executor task launch worker for task 18,5,main], Driver thread 9: This is inside worker 3 
FINISH Thread[Executor task launch worker for task 11,5,main] Driver thread 7: This is inside worker 8 
Thread[Executor task launch worker for task 11,5,main], Driver thread 7: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 10,5,main] Driver thread 7: This is inside worker 3 
Thread[Executor task launch worker for task 10,5,main], Driver thread 7: This is inside worker 4 
FINISH Thread[Executor task launch worker for task 12,5,main] Driver thread 6: This is inside worker 3 
Thread[Executor task launch worker for task 12,5,main], Driver thread 6: This is inside worker 4 
FINISH Thread[Executor task launch worker for task 13,5,main] Driver thread 6: This is inside worker 8 
Thread[Executor task launch worker for task 13,5,main], Driver thread 6: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 14,5,main] Driver thread 1: This is inside worker 3 
Thread[Executor task launch worker for task 14,5,main], Driver thread 1: This is inside worker 4 
FINISH Thread[Executor task launch worker for task 15,5,main] Driver thread 1: This is inside worker 8 
Thread[Executor task launch worker for task 15,5,main], Driver thread 1: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 16,5,main] Driver thread 8: This is inside worker 3 
Thread[Executor task launch worker for task 16,5,main], Driver thread 8: This is inside worker 4 
FINISH Thread[Executor task launch worker for task 17,5,main] Driver thread 8: This is inside worker 8 
Thread[Executor task launch worker for task 17,5,main], Driver thread 8: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 19,5,main] Driver thread 9: This is inside worker 8 
Thread[Executor task launch worker for task 19,5,main], Driver thread 9: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 18,5,main] Driver thread 9: This is inside worker 3 
Thread[Executor task launch worker for task 18,5,main], Driver thread 9: This is inside worker 4 
FINISH Thread[Executor task launch worker for task 11,5,main] Driver thread 7: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 10,5,main] Driver thread 7: This is inside worker 4 
FINISH Thread[Executor task launch worker for task 12,5,main] Driver thread 6: This is inside worker 4 
FINISH Thread[Executor task launch worker for task 13,5,main] Driver thread 6: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 14,5,main] Driver thread 1: This is inside worker 4 
FINISH Thread[Executor task launch worker for task 15,5,main] Driver thread 1: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 16,5,main] Driver thread 8: This is inside worker 4 
FINISH Thread[Executor task launch worker for task 17,5,main] Driver thread 8: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 19,5,main] Driver thread 9: This is inside worker 9 
FINISH Thread[Executor task launch worker for task 18,5,main] Driver thread 9: This is inside worker 4 

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接