在yarn-cluster模式下,如果设置了num-executors,Spark Kafka Direct DStream需要多少个执行器和RDD分区?

20


我正在尝试使用Spark Kafka直接流方法。根据文档,它通过创建与kafka主题分区数量相同的RDD分区来简化并行处理。根据我的理解,Spark将为每个RDD分区创建一个执行器来执行计算。

因此,当我以yarn-cluster模式提交应用程序,并将选项num-executors指定为不同于分区数的值时,将有多少个执行器?

例如,有一个具有2个分区的kafka主题,我将num-executors指定为4:


export YARN_CONF_DIR=$HADOOP_HOME/client_conf

./bin/spark-submit \
--class playground.MainClass \
--master yarn-cluster \
--num-executors 4 \
../spark_applications/uber-spark-streaming-0.0.1-SNAPSHOT.jar \
127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095 topic_1

我尝试了一下并发现执行器的数量为4,每个执行器都从kafka读取和处理数据。为什么?Kafka主题中只有2个分区,那么4个执行器如何从仅有2个分区的kafka主题中读取呢?

下面是Spark应用程序和日志的详细信息。

我的Spark应用程序,它在每个执行器中(在flatMap方法中)打印接收到的来自kafka的消息:

    ...
    String brokers = args[0];
    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(args[1].split(",")));
    kafkaParams.put("metadata.broker.list", brokers);

    JavaPairInputDStream<String, String> messages =
        KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
            kafkaParams, topicsSet);

    JavaPairDStream<String, Integer> wordCounts =
        messages.flatMap(new FlatMapFunction<Tuple2<String, String>, String>()
        {
            public Iterable<String> call(Tuple2<String, String> tuple) throws Exception
            {
                System.out.println(String.format("[received from kafka] tuple_1 is %s, tuple_2 is %s", tuple._1(),
                    tuple._2())); // print the kafka message received  in executor
                return Arrays.asList(SPACE.split(tuple._2()));
            }

        }).mapToPair(new PairFunction<String, String, Integer>()
        {
            public Tuple2<String, Integer> call(String word) throws Exception
            {
                System.out.println(String.format("[word]: %s", word));
                return new Tuple2<String, Integer>(word, 1);
            }

        }).reduceByKey(new Function2<Integer, Integer, Integer>()
        {
            public Integer call(Integer v1, Integer v2) throws Exception
            {
                return v1 + v2;
            }

        });

    wordCounts.print();

    Runtime.getRuntime().addShutdownHook(new Thread(){
        @Override
        public void run(){
            System.out.println("gracefully shutdown Spark!");
            jssc.stop(true, true);
        }
    });
    jssc.start();
    jssc.awaitTermination();

我的Kafka主题有2个分区。字符串"hello hello word 1", "hello hello word 2", "hello hello word 3"等被发送到该主题。

Topic: topic_2  PartitionCount:2    ReplicationFactor:2 Configs:
Topic: topic_2  Partition: 0    Leader: 3   Replicas: 3,1   Isr: 3,1
Topic: topic_2  Partition: 1    Leader: 1   Replicas: 1,2   Isr: 1,2

Web控制台enter image description here

执行器1的控制台输出:

...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 12
[word]: hello
[word]: hello
[word]: world
[word]: 12
...

执行程序2的控制台输出:

...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 2
[word]: hello
[word]: hello
[word]: world
[word]: 2
...

执行者3的控制台输出:

...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 3
[word]: hello
[word]: hello
[word]: world
[word]: 3
...

我打印每个RDD的分区数。它与kafka主题的分区数相同,在我的情况下为2。如何让3个执行程序并行处理总共有两个分区的一系列RDD?根据每个执行程序的控制台输出,所有执行程序都会处理来自RDD的数据。 - yzandrew
由于DStream是一系列RDD,因此在某些时间窗口内,这些RDD将在3个执行器中的2个中进行处理。而在另一个时间窗口内,这些RDD将在另外2个执行器中进行处理。我的理解正确吗? - yzandrew
1个回答

5
每个分区一次只能由一个执行器操作(假设您没有开启推测性执行)。 如果您的执行器数量超过分区数,那么并非所有执行器都会在任何给定的RDD上工作。但正如您所指出的,由于DStream是一系列RDD,随着时间的推移,每个执行器都将做一些工作。

如果分区数大于执行器数会发生什么? - Knight71
一旦执行器完成对一个分区的工作,它将被分配到另一个分区。 - Cody Koeninger
@CodyKoeninger 你好,我遇到了一个问题:假设有15个kafka分区和15个执行器,每个执行器都有8个核心。有时(在大多数情况下它工作得很好),只有3个执行器获得任务,因为3 * 8 > 15。但我想让每个执行器负责一个kafka分区。这可能吗?(即使我将rdd重新分区为196,其他执行器也不会获得任务。我正在使用spark 1.6.2) - 宇宙人
1
在较新版本的集成中,请参见LocationStrategies.PreferFixed:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#locationstrategies - Cody Koeninger
https://dev59.com/caTia4cB1Zd3GeqP8Rds - MaatDeamon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接