我该如何在Spark Streaming
中将RDD
转换为DataFrame
,而不仅仅是在Spark
中?
我看到了这个例子,但它需要SparkContext
。
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
在我的情况下,我有一个`StreamingContext`。那么我应该在`foreach`中创建`SparkContext`吗?这看起来太疯狂了...那么,如何解决这个问题呢?我的最终目标(如果可能的话)是将`DataFrame`保存在Amazon S3中,使用`rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")`,对于没有转换为`DataFrame`的`RDD`是不可能的(据我所知)。
myDstream.foreachRDD { rdd =>
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
}
foreachRDD
中写什么都会在Driver上执行,因此您可以创建sqlContext
并将rdd
转换为DF
,然后写入S3
。 - ShankarforeachRDD
之外创建StreamingContext和SparkContext吗?在你发布的示例中,我找不到sqlContext
的定义。我尝试复制这个示例,但它给了我一个错误,说找不到sqlContext
。我不想把事情搞得太复杂,所以我问最简单的解决方案。 - Lobsterrrr