我已经按照相同的步骤设置了一个Spark独立集群。我能够调试驱动程序、主节点、执行器JVM。
主节点和工作节点配置在服务器级别的机器上。该机器有12个CPU核心。Spark-2.2.0的源代码已从Spark Git Repo克隆。
步骤:
1] 启动主JVM的命令:
root@ubuntu:~/spark-2.2.0-bin-hadoop2.7/bin
./spark-class -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8787 org.apache.spark.deploy.master.Master
Shell脚本spark-class用于手动启动主节点。第一个参数是以调试模式启动主节点的JVM参数。JVM会暂停并等待IDE进行远程连接。
以下是显示远程调试IDE配置的屏幕截图:
2] 启动Worker JVM的命令:
root@ubuntu:~/spark-2.2.0-bin-hadoop2.7/bin
与主节点(master)相同,最后一个参数指定了Spark主节点的地址。工作节点(worker)的调试端口为8788。
作为启动的一部分,工作节点会向主节点注册。
截图
3] 一个包含main方法的基本Java应用程序被编译并打包成uber/fat jar。这在文本“学习Spark”中已经解释过。简而言之,Uber jar包含所有传递依赖项。
通过在以下目录运行mvn package来创建:
root@ubuntu:/home/customer/Documents/Texts/Spark/learning-spark-master# mvn package
上述操作会在./target文件夹下生成一个jar包。
下面的屏幕截图是Java应用程序,将提交到Spark集群:
4] 提交命令以将命令提交到独立的集群。
root@ubuntu:/home/customer/Documents/Texts/Spark/learning-spark-master# /home/customer/spark-2.2.0-bin-hadoop2.7/bin/spark-submit --master spark://10.71.220.34:7077
--conf "spark.executor.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8790"
--conf "spark.executor.extraClassPath=/home/customer/Documents/Texts/Spark/
learning-spark-master/target/java-0.0.2.jar"
--class com.oreilly.learningsparkexamples.java.BasicMapToDouble
--name "MapToDouble"
./target/java-0.0.2.jar
spark://10.71.220.34:7077 → Argument to the java program →
com.oreilly.learningsparkexamples.java.BasicMapToDouble
· 上述命令来自于运行有主方法应用的客户端节点。然而,转换是在远程执行器 JVM上执行的。
·
–conf参数很重要。它们用于配置执行器JVM。执行器JVM由Worker JVM在运行时启动。
· 第一个conf参数指定Executor JVM应该以调试模式启动并立即暂停。它在端口8790上启动。
· 第二个conf参数指定执行器类路径应包含提交给执行器的特定应用程序jar。在分布式设置中,这些jar需要移动到Executor JVM机器上。
· 最后一个参数由客户端应用程序用于连接Spark主节点。
要了解客户端应用程序如何连接到Spark集群,我们需要对客户端应用程序进行调试并逐步执行它。为此,我们需要将其配置为以调试模式运行。
要调试客户端,我们需要编辑脚本spark-submit,如下所示:
从spark-submit中获取的内容
exec "${SPARK_HOME}"/bin/spark-class -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8789 org.apache.spark.deploy.SparkSubmit "$@"
5] 客户端注册后,工作线程在运行时启动执行程序。
下面的截图显示了ExecutorRunner.scala类。
6] 现在我们使用IDE连接到分叉的执行程序JVM。执行程序JVM将在我们提交的应用程序中运行转换函数。
JavaDoubleRDD result = rdd.mapToDouble( → ***Transformation function/lambda***
new DoubleFunction<Integer>() {
public double call(Integer x) {
double y = (double) x;
return y * y;
}
});
7] 只有当执行“collect”操作时,转换函数才会运行。
8] 下面的截图显示了在列表的多个元素上并行调用mapToDouble函数时,执行器视图的情况。由于有12个核心,执行器JVM将函数在12个线程中执行。由于没有在命令行上设置核心数,工作节点默认设置了选项:-cores=12。
9] 屏幕截图显示客户端提交的代码[maptodouble()]在远程分叉的Executor JVM中运行。
10] 所有任务执行完毕后,Executor JVM退出。客户端应用程序退出后,工作节点解除阻止并等待下一次提交。
参考资料
我已经创建了一个博客,描述了如何调试这些子系统的步骤。希望能对其他人有所帮助。
概述步骤的博客:
https://sandeepmspark.blogspot.com/2018/01/spark-standalone-cluster-internals.html