提交Spark应用程序的JAR包是否必要?

10

如标题所述,我想知道是否需要spark-submit *.jar?

我已经使用Datastax Enterprise Cassandra一段时间了,但现在我也需要使用Spark。我几乎观看了来自DS320: DataStax Enterprise Analytics with Apache Spark的所有视频,但没有关于如何从Java应用程序远程连接到Spark的内容。

现在我有3个正在运行的DSE节点。我可以从spark shell连接到Spark。但是,在尝试了2天后,我放弃了尝试从Java代码连接到Spark。

这是我的Java代码

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("AppName");
//sparkConf.set("spark.shuffle.blockTransferService", "nio");
//sparkConf.set("spark.driver.host", "*.*.*.*");
//sparkConf.set("spark.driver.port", "7007");
sparkConf.setMaster("spark://*.*.*.*:7077");
JavaSparkContext sc = new JavaSparkContext(sparkConf);

连接结果

16/01/18 14:32:43 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from *.*.*.*/*.*.*.*:7077 is closed
16/01/18 14:32:43 WARN AppClient$ClientEndpoint: Failed to connect to master *.*.*.*:7077
java.io.IOException: Connection from *.*.*.*/*.*.*.*:7077 closed
    at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
    at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
16/01/18 14:33:03 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from *.*.*.*/*.*.*.*:7077 is closed
16/01/18 14:33:03 WARN AppClient$ClientEndpoint: Failed to connect to master *.*.*.*:7077
java.io.IOException: Connection from *.*.*.*/*.*.*.*:7077 closed
    at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
    at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
16/01/18 14:33:23 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
16/01/18 14:33:23 WARN SparkDeploySchedulerBackend: Application ID is not initialized yet.
16/01/18 14:33:23 WARN AppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master
16/01/18 14:33:23 ERROR MapOutputTrackerMaster: Error communicating with MapOutputTracker
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
    at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:110)
    at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:120)
    at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:462)
    at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93)
    at org.apache.spark.SparkContext$$anonfun$stop$12.apply$mcV$sp(SparkContext.scala:1756)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1755)
    at org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.dead(SparkDeploySchedulerBackend.scala:127)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint.markDead(AppClient.scala:264)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2$$anonfun$run$1.apply$mcV$sp(AppClient.scala:134)
    at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1163)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2.run(AppClient.scala:129)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/01/18 14:33:23 ERROR Utils: Uncaught exception in thread appclient-registration-retry-thread
org.apache.spark.SparkException: Error communicating with MapOutputTracker
    at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:114)
    at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:120)
    at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:462)
    at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93)
    at org.apache.spark.SparkContext$$anonfun$stop$12.apply$mcV$sp(SparkContext.scala:1756)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1755)
    at org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.dead(SparkDeploySchedulerBackend.scala:127)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint.markDead(AppClient.scala:264)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2$$anonfun$run$1.apply$mcV$sp(AppClient.scala:134)
    at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1163)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2.run(AppClient.scala:129)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
    at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:110)
    ... 18 more
16/01/18 14:33:23 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[appclient-registration-retry-thread,5,main]
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up.
    at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:438)
    at org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.dead(SparkDeploySchedulerBackend.scala:124)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint.markDead(AppClient.scala:264)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2$$anonfun$run$1.apply$mcV$sp(AppClient.scala:134)
    at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1163)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2.run(AppClient.scala:129)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我试图更改SPARK_MASTER_IP、SPARK_LOCAL_IP和许多其他配置变量,但没有成功。现在我发现了一些关于将jar包提交到Spark的文章,我不确定(找不到任何证据)是否是原因?spark-submit和交互式shell是使用spark的唯一方法吗?

有任何相关的文章吗?如果您能给我一些提示,我会非常感激。


我不确定这是否直接回答了您的问题,个人使用sbt assembly生成一个fat-jar(使用Maven应该也是可能的),然后通过例如scp将其发送到远程服务器,并在那里像执行任何jar文件一样执行它。 - Rami
1
使用 dse spark submit 命令,它会自动获取环境变量。 - phact
2个回答

2
我强烈建议在DSE中使用dse spark-submit。虽然不是必需的,但与确保您的安全和类路径选项适用于集群相比,它肯定更容易。在我看来,这也为配置SparkConf和将jar放置在执行程序类路径上提供了更简单的方法。
在DSE中,它还会自动将您的应用程序路由到正确的Spark主URL,进一步简化设置。
如果您真的想手动构建SparkConf,请确保将Spark master映射到dsetool spark-master的输出或其在您的DSE版本中的等效项。

谢谢您的回复!我正在使用http://www.datastax.com/wp-content/uploads/resources/DataStax-WP-Best_Practices_Running_DSE_Within_Docker.pdf,但是关于`dse spark-submit`没有任何内容。假设我在开始时不需要自定义代码(因此不需要提交?),那么您知道是否还有其他事情要做(更多配置?)才能连接到Spark吗? - Marcin Lagowski
根据您使用的安全和连接选项,它可能会很多。例如,如果您使用Kerberos,您需要从 DSE 构建 jar 中获取自定义验证器,因此需要将其放在类路径上并设置正确的挂钩。这也可能会随版本改变而改变... - RussS
我现在没有使用Kerberos。我只是想让它正常工作。看起来它正在处理我可以在Spark WebUI中看到的IP地址。这不是我的节点的公共IP地址(我用于连接),而是内部Docker网络IP地址...而且没有任何变量设置可以改变这一点。 - Marcin Lagowski
DSE Spark默认绑定C*节点的首选IP。这是侦听地址,除非您已设置snitch以优先使用外部IP(请参见Gossiping Property File Snitch或EC2Snitch)。您需要将连接地址映射到内部地址,或更改Spark连接方式。当然,如果您只是从docker的网络空间内运行应用程序,那么也没问题。 - RussS

0

您需要一个jar文件,以便执行器可以运行您的自定义代码。您可以使用SparkConf.setJars设置此jar文件。但是,这不是连接到Spark主节点并设置Spark应用程序的必需条件。谁知道呢,也许您不想运行任何自定义代码。(这在Spark SQL中可能是合理的情况。)

您也不需要使用spark-submit


我对DataStax一无所知,所以它可能是任何东西。但从错误消息来看,可能是您的应用程序尝试连接到错误的主机,或者存在网络问题。如果您可以使用spark-shell从同一台机器访问相同的Spark主节点,则当然不是这种情况。检查主节点日志,也许它可以告诉您为什么关闭了来自您的应用程序的连接。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接