什么是Spark作业?

45

我已经完成了Spark的安装并在设置主节点和工作节点后执行了一些测试用例。话虽如此,我对Spark上下文中“job”究竟是什么有很大的困惑(不是SparkContext)。 我有以下问题

  • “job”与Driver程序有什么不同。
  • 应用程序本身是Driver程序的一部分吗?
  • Spark submit 在某种程度上是一个job吗?

我阅读了Spark文档,但仍然对此一点不清楚。

话虽如此,我的实现是编写spark jobs {编程},这将进行spark-submit。

如果可能,请提供一些示例来帮助解释。这将非常有帮助。

注意: 请不要发布Spark链接,因为我已经尝试过了。即使问题听起来很幼稚,但我仍然需要更加清晰地理解。

2个回答

67

术语在不同上下文中有不同的含义,因此常常会很困难。在许多情况下,您可以使用“将作业提交到群集”这个术语,对于Spark来说,就是要提交一个驱动程序。

话虽如此,Spark有自己的“作业”定义,直接从词汇表中得出:

作业 由多个任务组成的并行计算,在Spark操作(例如保存、收集)的响应中生成;您将在驱动程序的日志中看到此术语。

因此,在这种情况下,假设您需要执行以下操作:

  1. 将包含人名和地址的文件加载到RDD1中
  2. 将包含人名和电话的文件加载到RDD2中
  3. 通过名称加入RDD1和RDD2,以获取RDD3
  4. 映射RDD3以获得每个人的漂亮HTML介绍卡,作为RDD4
  5. 保存RDD4到文件。
  6. 从RDD1映射以提取地址中的邮政编码,以获得RDD5
  7. 在RDD5上进行聚合,以获取居住在每个邮政编码上的人数总计,并生成RDD6
  8. 收集RDD6并将这些统计数据打印到标准输出。

因此,

  1. 驱动程序是整个代码段,运行所有8个步骤。
  2. 在第5步上生成整个HTML卡片集合是一个作业(因为我们使用了“保存”操作,而不是转换操作)。在第8步中也是一样的,使用了“收集”操作。
  • 其它步骤将被组织成阶段,每个作业都是一系列阶段的结果。对于简单的任务,可以只有一个阶段,但是需要重新分区数据(例如第3步上的连接)或任何打破数据局部性的操作通常会导致出现更多的阶段。您可以将阶段视为产生中间结果的计算,实际上可以持久化这些中间结果。例如,我们可以持久化RDD1,因为我们将使用它超过一次,避免重新计算。
  • 以上三点基本上都谈论了给定算法的逻辑如何被拆分。相比之下,任务是一个特定的数据片段,将在给定的阶段上,在给定的执行器上运行。
  • 希望这样能让事情更加清晰明了 ;-)


    1
    现在我明白了 :) 但是我仍然有一个关于如何编写作业调度的问题。我已经阅读了文档,但无法理解代码。 - chaosguru
    2
    那要看你使用的基础设施是什么(例如,你是否在使用Yarn上的Spark?)这不是我的强项,但原则上,我会从Bash脚本中启动所有驱动程序(以便记住参数、创建输出文件夹等)。任何能运行控制台命令的正常调度工具都应该可以使用。如果每个作业都使用集群中的所有资源,那么你只需提交程序,它们就会等待资源被释放。 - Daniel Langdon
    @DanielLangdon 第一步,将文件加载到RDD中也是一个任务吗? - akash patel
    2
    @akashpatel 不是这样的。一个工作意味着Spark操作(例如,保存,收集)和需要运行以评估该操作的任何任务。 - abhimanyu singh
    不好意思,能否有人确认关系是否总是一对一(1个操作=1个作业),即对于每个[Spark操作],只会触发1个作业,反之亦然。谢谢。 - Sarye Haddadi

    -2

    嘿,这是我以前做过的东西,希望对你有用:

    #!/bin/bash
    # Hadoop and Server Variables
    HADOOP="hadoop fs"
    HDFS_HOME="hdfs://ha-edge-group/user/max"
    LOCAL_HOME="/home/max"
    
    # Cluster Variables
    DRIVER_MEM="10G"
    EXECUTOR_MEM="10G"
    CORES="5"
    EXECUTORS="15"
    
    # Script Arguments
    SCRIPT="availability_report.py" # Arg[0]
    APPNAME="Availability Report" # arg[1]
    
    DAY=`date -d yesterday +%Y%m%d`
    
    for HOUR in 00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23
    do
            #local directory to getmerge to
            LOCAL_OUTFILE="$LOCAL_HOME/availability_report/data/$DAY/$HOUR.txt"
    
            # Script arguments
            HDFS_SOURCE="webhdfs://1.2.3.4:0000/data/lbs_ndc/raw_$DAY'_'$HOUR" # arg[2]
            HDFS_CELLS="webhdfs://1.2.3.4:0000/data/cells/CELLID_$DAY.txt" # arg[3]
            HDFS_OUT_DIR="$HDFS_HOME/availability/$DAY/$HOUR" # arg[4]
    
            spark-submit \
            --master yarn-cluster \
            --driver-memory $DRIVER_MEM \
            --executor-memory $EXECUTOR_MEM \
            --executor-cores $CORES \
            --num-executors $EXECUTORS \
            --conf spark.scheduler.mode=FAIR \
            $SCRIPT $APPNAME $HDFS_SOURCE $HDFS_CELLS $HDFS_OUT_DIR
    
            $HADOOP -getmerge $HDFS_OUT_DIR $LOCAL_OUTFILE
    done
    

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接