Spark流处理Kafka消息未被消费

11

我想使用Spark(1.6.2)Streaming从Kafka(Broker版本0.10.2.1)中的一个主题接收消息。

我正在使用Receiver方法。 代码如下:

public static void main(String[] args) throws Exception
{
    SparkConf sparkConf = new SparkConf().setAppName("SimpleStreamingApp");
    JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(5000));
    //
    Map<String, Integer> topicMap = new HashMap<>();
    topicMap.put("myTopic", 1);
    //
    String zkQuorum = "host1:port1,host2:port2,host3:port3";
    //
    Map<String, String> kafkaParamsMap = new HashMap<>();
    kafkaParamsMap.put("bootstraps.server", zkQuorum);
    kafkaParamsMap.put("metadata.broker.list", zkQuorum);
    kafkaParamsMap.put("zookeeper.connect", zkQuorum);
    kafkaParamsMap.put("group.id", "group_name");
    kafkaParamsMap.put("security.protocol", "SASL_PLAINTEXT");
    kafkaParamsMap.put("security.mechanism", "GSSAPI");
    kafkaParamsMap.put("ssl.kerberos.service.name", "kafka");
    kafkaParamsMap.put("key.deserializer", "kafka.serializer.StringDecoder");
    kafkaParamsMap.put("value.deserializer", "kafka.serializer.DefaultDecoder");
    //
    JavaPairReceiverInputDStream<byte[], byte[]> stream = KafkaUtils.createStream(javaStreamingContext,
                            byte[].class, byte[].class,
                            DefaultDecoder.class, DefaultDecoder.class,
                            kafkaParamsMap,
                            topicMap,
                            StorageLevel.MEMORY_ONLY());

    VoidFunction<JavaPairRDD<byte[], byte[]>> voidFunc = new VoidFunction<JavaPairRDD<byte[], byte[]>> ()
    {
       public void call(JavaPairRDD<byte[], byte[]> rdd) throws Exception
       {
          List<Tuple2<byte[], byte[]>> all = rdd.collect();
          System.out.println("size of red: " + all.size());
       }
    }

    stream.forEach(voidFunc);

    javaStreamingContext.start();
    javaStreamingContext.awaitTermination();
}

访问Kafka需要进行Kerberos认证。我启动时使用以下命令:

spark-submit --verbose --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" --files jaas.conf,privKey.der --principal <accountName> --keytab <path to keytab file> --master yarn --jars <comma separated path to all jars> --class <fully qualified java main class> <path to jar file containing main class>

  1. kafkaParams哈希映射中包含的属性会在Kafka日志中触发VerifiableProperties类发出警告消息:
INFO KafkaReceiver: connecting to zookeeper: <the correct zookeeper quorum provided in kafkaParams map>

VerifiableProperties: Property auto.offset.reset is overridden to largest
VerifiableProperties: Property enable.auto.commit is not valid.
VerifiableProperties: Property sasl.kerberos.service.name is not valid
VerifiableProperties: Property key.deserializer is not valid
...
VerifiableProperties: Property zookeeper.connect is overridden to ....
我认为因为这些属性没有被接受,所以可能会影响流处理。 当我在集群模式下启动--master yarn时,这些警告消息就不会出现。
随后,我每5秒钟看到以下日志重复出现,如下所配置: INFO BlockRDD:从持久性列表中删除RDD 4 INFO KafkaInputDStream:在createStream处删除RDD BlockRDD [4]的块 INFO ReceivedBlockTracker:正在删除批次ArrayBuffer() INFO ... INFO BlockManager:删除RDD 4
然而,我并没有看到任何实际的消息打印在控制台上。
问题:为什么我的代码没有打印任何实际的消息?
我的Gradle依赖关系是:
compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.6.2'
compile group: 'org.apache.spark', name: 'spark-streaming_2.10', version: '1.6.2'
compile group: 'org.apache.spark', name: 'spark-streaming-kafka_2.10', version: '1.6.2'

1
你是要将Spark作业提交到远程集群吗? - Soheil Pourbafrani
是的,现在我正在使用选项“--master yarn”将其提交到远程。稍后我会更新一些日志。在代码开头删除了“setMaster(..)”API调用。 - RRM
你为执行器分配了多少个核心,有多少个执行器? - Knight71
请查看此链接 https://dev59.com/zfD0s4cB2Jgan1znYpUT#39587641 - Sree Eedupuganti
2个回答

0

stream 是 JavaPairReceiverInputDStream 的一个对象。将其转换为 Dstream,并使用 foreachRDD 打印从 Kafka 消费的消息。


之前的代码没有起作用,现在我面临着不同的问题。我会更新问题并附上发现。 - RRM

0

Spark 1.6.2不支持kafka 0.10,只支持kafka 0.8。如果要使用kafka 0.10,则应该使用spark 2。


1
如果是这样,为什么没有任何错误提示呢? - RRM

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接