如何使用Spark结构化流与Kafka直接流?

11

我发现了Spark结构化流处理,其中有一个不断从S3桶中消费并将处理结果写入MySQL DB的示例。

// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")

// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
       .writeStream.format("jdbc")
       .start("jdbc:mysql//...")

这可以如何与 Spark Kafka Streaming一起使用?

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)
有没有一种方法可以在不使用 "stream.foreachRDD(rdd => {})" 的情况下将这两个示例组合起来?
2个回答

12
有没有一种方法可以将这两个示例结合起来,而不使用stream.foreachRDD(rdd => {})
目前还没有。Spark 2.0.0没有针对Structured Streaming的Kafka sink支持。这是一个功能,应该会在Spark 2.1.0中推出,根据Spark Streaming的创建者Tathagata Das的说法这是相关的JIRA问题

编辑:(2018年11月29日)

是的,从Spark版本2.2开始,这是可能的。
stream
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()

请点击SO post(read and write on Kafka topic with Spark streaming)了解更多相关编程内容。

编辑:(06/12/2016)

现在,Spark 2.0.2实验性地支持Kafka 0.10与Structured Streaming的集成,请参考expiramentaly supported in Spark 2.0.2

val ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

ds1
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

有没有一种方式可以追踪进度功能?例如Jira故事,功能请求等。 - SergeyB
1
@ike_love 是的,你可以在这里找到它:(https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-15406) - Yuval Itzchakov
请参考以下文档获取(Kafka)Alpha版本的相关信息:http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html - Murtaza Kanchwala

3

我也遇到了类似的问题,就是从Kafka源读取数据并将其写入Cassandra。我在这里创建了一个简单的项目kafka2spark2cassandra,如果有人需要可以分享一下。


@Sokia - 你的项目非常出色,整洁且自包含。谢谢。 - SergeyB

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接