如何优雅地在YARN上停止Spark Streaming应用程序?

9
我正在以集群模式在YARN上运行一个Spark Streaming应用程序,我尝试实现优雅的关闭,这样当应用程序被杀死时,它将在停止之前完成当前微批处理的执行。
根据一些教程,我已经配置了spark.streaming.stopGracefullyOnShutdowntrue,并且我已经添加了以下代码到我的应用程序:
sys.ShutdownHookThread {
   log.info("Gracefully stopping Spark Streaming Application")  
   ssc.stop(true, true)
   log.info("Application stopped")
}

然而,当我使用

yarn application -kill application_1454432703118_3558

杀掉应用程序时,此时执行的微批处理未完成。

驱动程序中,我看到打印了第一行日志(“优雅地停止Spark Streaming应用程序”),但没有打印最后一行日志(“应用程序已停止”)。

ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
INFO streaming.MySparkJob: Gracefully stopping Spark Streaming Application
INFO scheduler.JobGenerator: Stopping JobGenerator gracefully
INFO scheduler.JobGenerator: Waiting for all received blocks to be consumed for job generation
INFO scheduler.JobGenerator: Waited for all received blocks to be consumed for job generation
INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook

执行者日志中,我看到以下错误:
ERROR executor.CoarseGrainedExecutorBackend: Driver 192.168.6.21:49767 disassociated! Shutting down.
INFO storage.DiskBlockManager: Shutdown hook called
WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@192.168.6.21:49767] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
INFO util.ShutdownHookManager: Shutdown hook called

我认为问题与YARN如何发送终止信号给应用有关。你有什么想法可以让应用程序优雅地停止吗?


你能解决它吗? - Gaurav Shah
不好意思,不行。 - Erica
2个回答

2

您应该转到执行页面以查看您的驱动程序在哪里运行(在哪个节点上)。ssh到该节点并执行以下操作:

ps -ef | grep 'app_name'

(将app_name替换为您的类名/应用程序名称)。它将列出几个进程。查看进程,有些将是其他进程的子进程。选择最父级进程的ID并发送SIGTERM信号。

kill pid

一段时间后,您会发现应用程序已经优雅地终止。 现在您不需要添加那些关闭的钩子了。 使用spark.streaming.stopGracefullyOnShutdown配置可以帮助平稳关闭。

这似乎是最好的选择。在Spark Streaming中使用检查点时,yarn application -kill不会杀死应用程序的根进程,即使在此之后,检查点仍将继续进行。 - Saif Charaniya
立即终止了应用程序,尝试了几次 :( - Gaurav Shah
@GauravShah,你指的是“那个”,是yarn kill命令还是Linux kill命令? - Amit Kumar
2
@amit_kumar yarn application -kill 使用 TERM 信号将其终止。 - Gaurav Shah

1
当触发自定义条件时,您可以通过调用 ssc.stop 来停止 Spark 流应用程序,而不是使用 awaitTermination。如下伪代码所示:
ssc.start()
while True:
    time.sleep(10s)
    if some_file_exist:
        ssc.stop(True, True)

虽然这可能回答了作者的问题,但它缺少一些解释性的词语和文档链接。裸代码片段没有周围的一些短语是不太有帮助的。您也可以发现如何撰写一个好的答案非常有帮助。请编辑您的答案。 - hellow

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接