不使用spark submit部署Spark Driver应用程序

6
假设我们有一个像这样编写的Spark驱动程序:

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkConf conf = new SparkConf().setAppName("Simple Application");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> logData = sc.textFile(logFile).cache();

    long numAs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("a"); }
    }).count();

    long numBs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("b"); }
    }).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
  }
}

我想在Yarn集群中运行,我是否可以避免使用spark-submit(当然假设我有一个集群节点的访问权限),只需在上下文中指定我要在Yarn上运行即可?换句话说,是否有可能将Spark客户端作为常规Java应用程序利用Yarn启动?


如果使用Scala编写,我知道你可以直接使用spark-shell -i file.scala --master yarn-client。但我认为Java不支持这种方式。 - OneCricketeer
4个回答

4

下面是另一种官方方法。

Spark Launcher - 用于启动Spark应用程序的库。

该库允许应用程序以编程方式启动Spark。库中只有一个入口点 - SparkLauncher类。

要启动一个Spark应用程序,只需实例化一个SparkLauncher并配置要运行的应用程序。例如:

 import org.apache.spark.launcher.SparkLauncher;

   public class MyLauncher {
     public static void main(String[] args) throws Exception {
       Process spark = new SparkLauncher()
         .setAppResource("/my/app.jar")
         .setMainClass("my.spark.app.Main")
         .setMaster("local")
         .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
         .launch();
       spark.waitFor();
     }
   }

你可以使用setConf方法设置所有与YARN相关的配置,并将主节点设置为yarn-clientyarn-cluster
参考文献: https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/launcher/package-summary.html

1

来自Hortonworks博客文章

1)将Spark汇编jar包复制到HDFS

默认情况下,Spark汇编jar文件在HDFS中不可用。为了进行远程访问,我们需要它。

HDP中一些标准位置是:

HDP 2.3.2:
    Version: 2.3.2.0-2950
    Spark Jar: /usr/hdp/2.3.2.0-2950/spark/lib/spark-assembly-1.4.1.2.3.2.0-2950-hadoop2.7.1.2.3.2.0-2950.jar
HDP 2.4.0:
    Version: 2.4.0.0-169
    Spark Jar: /usr/hdp/2.4.0.0-169/spark/lib/spark-assembly-1.6.0.2.4.0.0-169-hadoop2.7.1.2.4.0.0-169.jar

这是一次性的准备步骤,例如对于HDP 2.4,它将是:
sudo su - hdfs
HDP_VERSION=2.4.0.0-169
SPARK_JAR=spark-assembly-1.6.0.2.4.0.0-169-hadoop2.7.1.2.4.0.0-169.jar
hdfs dfs -mkdir "/hdp/apps/${HDP_VERSION}/spark/"
hdfs dfs -put "/usr/hdp/${HDP_VERSION}/spark/lib/$SPARK_JAR" "/hdp/apps/${HDP_VERSION}/spark/spark-hdp-assembly.jar"

2) 将您的Spark应用程序jar文件上传到HDFS

通过WebHdfs将使用sbt打包的Spark应用程序jar文件上传到HDFS中的项目文件夹(可能使用比“/tmp”更好的路径):

export APP_FILE=simple-project_2.10-1.0.jar
curl    -X PUT "${WEBHDFS_HOST}:50070/webhdfs/v1/tmp/simple-project?op=MKDIRS"
curl -i -X PUT "${WEBHDFS_HOST}:50070/webhdfs/v1/tmp/simple-project/${APP_FILE}?op=CREATE&overwrite=true"
# take Location header from the response and issue a PUT request
LOCATION="http://..."
curl -i -X PUT -T "target/scala-2.10/${APP_FILE}" "${LOCATION}"

创建 Spark 属性文件并上传到 HDFS。
spark.yarn.submit.file.replication=3
spark.yarn.executor.memoryOverhead=384
spark.yarn.driver.memoryOverhead=384
spark.master=yarn
spark.submit.deployMode=cluster
spark.eventLog.enabled=true
spark.yarn.scheduler.heartbeat.interval-ms=5000
spark.yarn.preserve.staging.files=true
spark.yarn.queue=default
spark.yarn.containerLauncherMaxThreads=25
spark.yarn.max.executor.failures=3
spark.executor.instances=2
spark.eventLog.dir=hdfs\:///spark-history
spark.history.kerberos.enabled=true
spark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port=18080
spark.history.fs.logDirectory=hdfs\:///spark-history
spark.executor.memory=2G
spark.executor.cores=2
spark.history.kerberos.keytab=none
spark.history.kerberos.principal=none

请按照之前的方法,将其作为spark-yarn.properties文件通过WebHDFS上传到simple-project文件夹中。

4) 创建一个Spark Job json文件

a) 我们需要构建启动Spark ApplicationMaster的命令:

java -server -Xmx1024m -Dhdp.version=2.4.0.0-169 \
     -Dspark.yarn.app.container.log.dir=/hadoop/yarn/log/rest-api \
     -Dspark.app.name=SimpleProject \
     org.apache.spark.deploy.yarn.ApplicationMaster \
     --class IrisApp --jar __app__.jar \
     --arg '--class' --arg 'SimpleProject' \
     1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr

重要的是提供Spark应用程序名称和HDP版本。Spark将解决。
b) 我们需要设置一些通用的环境变量。
JAVA_HOME="/usr/jdk64/jdk1.8.0_60/"
SPARK_YARN_MODE=true
HDP_VERSION="2.4.0.0-169" 

接下来,我们需要告诉Spark将哪些文件分发到所有Spark执行器。因此,我们需要设置4个变量。其中一个变量的格式为“#,#,...”,另外三个包含逗号分隔的时间戳、文件大小和每个文件的可见性(顺序相同):

SPARK_YARN_CACHE_FILES: "hdfs://<<name-node>>:8020/tmp/simple-project/simple-project.jar#__app__.jar,hdfs://<<name-node>>:8020/hdp/apps/2.4.0.0-169/spark/spark-hdp-assembly.jar#__spark__.jar"
SPARK_YARN_CACHE_FILES_FILE_SIZES: "10588,191724610"
SPARK_YARN_CACHE_FILES_TIME_STAMPS: "1460990579987,1460219553714"
SPARK_YARN_CACHE_FILES_VISIBILITIES: "PUBLIC,PRIVATE"

请将 <> 替换为正确的地址。文件大小和时间戳可以通过WebHDFS从HDFS中检索。

接下来,构建类路径。

CLASSPATH="{{PWD}}<CPS>__spark__.jar<CPS>{{PWD}}/__app__.jar<CPS>{{PWD}}/__app__.properties<CPS>{{HADOOP_CONF_DIR}}<CPS>/usr/hdp/current/hadoop-client/*<CPS>/usr/hdp/current/hadoop-client/lib/*<CPS>/usr/hdp/current/hadoop-hdfs-client/*<CPS>/usr/hdp/current/hadoop-hdfs-client/lib/*<CPS>/usr/hdp/current/hadoop-yarn-client/*<CPS>/usr/hdp/current/hadoop-yarn-client/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/common/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/common/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/yarn/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/yarn/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/hdfs/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/hdfs/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/tools/lib/*<CPS>/usr/hdp/2.4.0.0-169/hadoop/lib/hadoop-lzo-0.6.0.2.4.0.0-169.jar<CPS>/etc/hadoop/conf/secure<CPS>"

注意:- spark.jar 和 app.jar 与 SPARK_YARN_CACHE_FILES 中提供的相同。

  • Spark 将解析为:

c)创建 Spark 作业 json 文件

上述信息将作为命令和环境属性添加到 Spark json 文件中(详见附件-删除 .txt 结尾)

最后缺失的部分是所谓的 local_resources,它描述了 HDFS 中所有对于 Spark 作业必要的文件: - Spark 组装 jar(与缓存环境变量中相同) - 该项目的 Spark 应用程序 jar(与缓存环境变量中相同) - 该项目的 Spark 属性文件(仅适用于应用程序主节点,无需缓存)

所有这三个都需要以一种形式给出

{
  "key": "__app__.jar", 
  "value": {
    "resource": "hdfs://<<name-node>>:8020/tmp/simple-project/simple-project.jar", 
    "size": 10588, 
    "timestamp": 1460990579987, 
    "type": "FILE", 
    "visibility": "APPLICATION"
  }
}, 

请再次替换 <>。 时间戳、HDFS路径、大小和密钥需要与缓存环境变量相同。将其保存为spark-yarn.json(详见附件-删除.txt结尾)。提交作业前,请先从YARN请求应用程序ID。
curl -s -X POST -d '' \
     https://$KNOX_SERVER:8443/gateway/default/resourcemanager/v1/cluster/apps/new-application
# {
#   "application-id": "application_1460195242962_0054",
#   "maximum-resource-capability": {
#     "memory": 8192,
#     "vCores": 3
#   } 
# }

编辑spark-yarn.json中的"application-id",然后提交作业:

curl -s -i -X POST -H "Content-Type: application/json" ${HADOOP_RM}/ws/v1/cluster/apps \
     --data-binary spark-yarn.json 
# HTTP/1.1 100 Continue
# 
# HTTP/1.1 202 Accepted
# Cache-Control: no-cache
# Expires: Sun, 10 Apr 2016 13:02:47 GMT
# Date: Sun, 10 Apr 2016 13:02:47 GMT
# Pragma: no-cache
# Expires: Sun, 10 Apr 2016 13:02:47 GMT
# Date: Sun, 10 Apr 2016 13:02:47 GMT
# Pragma: no-cache
# Content-Type: application/json
# Location: http://<<resource-manager>>:8088/ws/v1/cluster/apps/application_1460195242962_0054
# Content-Length: 0
# Server: Jetty(6.1.26.hwx)

此外,博客作者还提供了一个有用的Python助手,位于这个GitLab项目中。


1
这看起来比仅使用“spark-submit”要多做很多工作。反正这个JAR包必须要构建。 - OneCricketeer
@cricket_007 这就是为什么 spark-submit 隐藏了所有这些复杂性并为您完成工作的原因。 - Rakesh Rakshit
@cricket_007,我已经添加了一个答案,您可以利用spark-launcher库。 - Rakesh Rakshit
嗨@RakeshRakshit,看起来你只是从博客上复制了大部分内容。我已经编辑了你的帖子,为作者署名。在Stack Overflow上,抄袭并不受欢迎,感谢你提供这个答案! - Rick Moritz

0
你应该查看 org.apache.spark.deploy.yarn.Client,它提供了你想要使用的API。它允许你定义一个 SparkConf,命令行参数,然后使用 run() 方法将相应的作业提交到 Yarn。
我猜这就是你想要实现的。
我在另一个问题中找到了以下 tutorial,它给出了一个简短的实际示例。
或者,这里有另一篇 blog post,涵盖了这个问题。
这是 source code,供参考,因为API并没有完全记录。

虽然这个链接可能回答了问题,但最好在此处包含答案的基本部分并提供参考链接。如果链接页面更改,仅链接的答案可能会失效。- 来自审查 - Doron Yakovlev Golani
@DoronYakovlev-Golani,感谢您的提示,我已经添加了必要的信息,并提供了额外的参考资料,这应该作为一个起点。 - Rick Moritz

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接