从一个Actor中进行Spark-Streaming

9
我希望一个消费者角色能够订阅Kafka主题并流式传输数据以便在使用Spark Streaming进行进一步处理,为什么要使用Actor? 因为我读过它的监督策略是处理Kafka故障的好方法(例如,在失败时重新启动)。
我发现了两个选项:
  • Java KafkaConsumer类:其poll()方法返回一个Map[String, Object]。 我想要像KafkaUtils.createDirectStream一样返回一个DStream,但不知道如何在角色外部获取该流。
  • 扩展ActorHelper特质并使用actorStream(),如此示例所示。 后一种选项没有显示连接到主题,而是连接到套接字。
有人可以指点我正确的方向吗?
1个回答

2

为了处理Kafka故障,我使用了Apache Curator框架和以下解决方法:

val client: CuratorFramework = ... // see docs
val zk: CuratorZookeeperClient = client.getZookeeperClient

/**
  * This method returns false if kafka or zookeeper is down.
  */ 
def isKafkaAvailable:Boolean = 
   Try {
      if (zk.isConnected) {
        val xs = client.getChildren.forPath("/brokers/ids")
        xs.size() > 0
      }
      else false
    }.getOrElse(false)

为了消费Kafka主题,我使用了com.softwaremill.reactivekafka库。例如:

class KafkaConsumerActor extends Actor {
   val kafka = new ReactiveKafka()
   val config: ConsumerProperties[Array[Byte], Any] = ... // see docs

   override def preStart(): Unit = {
      super.preStart()

      val publisher = kafka.consume(config)
      Source.fromPublisher(publisher)
            .map(handleKafkaRecord)
            .to(Sink.ignore).run()
   }

   /**
     * This method will be invoked when any kafka records will happen.
     */
   def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = {
      // handle record
   }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接