我发现了Spark结构化流处理,其中有一个不断从S3桶中消费并将处理结果写入MySQL DB的示例。
// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")
// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
.writeStream.format("jdbc")
.start("jdbc:mysql//...")
这可以如何与 Spark Kafka Streaming一起使用?
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
有没有一种方法可以在不使用 "stream.foreachRDD(rdd => {})" 的情况下将这两个示例组合起来?