如何在Spark Streaming中将RDD转换为DataFrame,而不仅仅是Spark。

6

我该如何在Spark Streaming中将RDD转换为DataFrame,而不仅仅是在Spark中?

我看到了这个例子,但它需要SparkContext

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
rdd.toDF()

在我的情况下,我有一个`StreamingContext`。那么我应该在`foreach`中创建`SparkContext`吗?这看起来太疯狂了...那么,如何解决这个问题呢?我的最终目标(如果可能的话)是将`DataFrame`保存在Amazon S3中,使用`rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")`,对于没有转换为`DataFrame`的`RDD`是不可能的(据我所知)。
myDstream.foreachRDD { rdd =>
    val conf = new SparkConf().setMaster("local").setAppName("My App")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._
    rdd.toDF()
}

请查看此链接 https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/08%20Write%20Output%20To%20S3.html - Shankar
@Shankar:他在哪里定义了AWS访问密钥? - Lobsterrrr
无论在foreachRDD中写什么都会在Driver上执行,因此您可以创建sqlContext并将rdd转换为DF,然后写入S3 - Shankar
@Shankar:我还是不太明白:我应该在foreachRDD之外创建StreamingContext和SparkContext吗?在你发布的示例中,我找不到sqlContext的定义。我尝试复制这个示例,但它给了我一个错误,说找不到sqlContext。我不想把事情搞得太复杂,所以我问最简单的解决方案。 - Lobsterrrr
2个回答

2
foreachRDD 之外创建 sqlContext,一旦使用 sqlContextrdd 转换为 DF,您就可以将其写入 S3。
例如:
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
myDstream.foreachRDD { rdd =>

    val df = rdd.toDF()
    df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")
}

更新:

即使在 foreachRDD 中创建的 sqlContext 将在驱动程序上执行,也可以正常运行。


我测试了这个例子。它指的是saveAsTextFile,显示无法解析符号。我使用的是Scala 2.11和Spark 1.6.2。 - Lobsterrrr
1
请尝试使用以下链接:http://spark.apache.org/docs/1.6.1/api/scala/index.html#org.apache.spark.sql.DataFrameWriter - Rockie Yang
另一个问题是我收到了关于多个SparkContext的错误。我认为这是因为我同时拥有SparkContext和StreamingContext:val ssc = new StreamingContext(conf,Seconds(refreshingIntervalSeconds.toInt)) val sc = new SparkContext(conf) sc.hadoopConfiguration.set(“fs.s3n.impl”,“org.apache.hadoop.fs.s3native.NativeS3FileSystem”) sc.hadoopConfiguration.set(“fs.s3n.awsAccessKeyId”,Utils.getAWS_ACCESS_KEY()) sc.hadoopConfiguration.set(“fs.s3n.awsSecretAccessKey”,Utils.getAWS_SECRET_KEY()) val sqlContext = new SQLContext(sc) - Lobsterrrr
2
尝试使用 val ssc = new StreamingContext(sc, Seconds(refreshingIntervalSeconds.toInt)) - Rockie Yang

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接