如何将RDD[Row]转换为RDD[String]

6

我有一个叫做source的DataFrame,是从mysql中获取的表格。

val source = sqlContext.read.jdbc(jdbcUrl, "source", connectionProperties)

我已通过将其转换为RDD进行了转换。
val sourceRdd = source.rdd

但我需要的是RDD[String],而不是它的RDD[Row],以便进行诸如以下转换:

source.map(rec => (rec.split(",")(0).toInt, rec)), .subtractByKey(), etc..

谢谢你。

你能包含 source.printSchema 的输出吗?我想向你展示如何避免进入 RDD 级别(这是无论如何都不应该做的)。 - Jacek Laskowski
2个回答

7

您可以在map调用中使用Row. mkString(sep: String): String方法,如下所示:

val sourceRdd = source.rdd.map(_.mkString(","))

您可以按照自己的需要更改","参数。

希望这能帮到您,最好的问候。


如果字符串中有逗号“,”,那么你的方法可能会失败。 - T. Gawęda
@T.Gawęda 如果你在谈论我们想要创建的字符串,那么我们需要避免使用 "," 并将其更改为另一个分隔符。 - Haroun Mohammedi

3

你的数据库模式是什么?

如果它只是一个字符串,你可以使用:

import spark.implicits._
val sourceDS = source.as[String]
val sourceRdd = sourceDS.rdd // will give RDD[String]

注意:在Spark 1.6中使用sqlContext而不是spark - spark是SparkSession,它是Spark 2.0中的一个新类,并且是SQL功能的新入口点。在Spark 2.x中应该使用它而不是SQLContext。
您还可以创建自己的案例类。
此外,您可以映射行 - 这里源类型为DataFrame,我们在map函数中使用偏函数:
val sourceRdd = source.rdd.map { case x : Row => x(0).asInstanceOf[String] }.map(s => s.split(","))

我尝试了第一种方法,它抛出了错误——无法找到存储在数据集中的类型的编码器。导入sqlContext.implicits._ 支持基本类型(Int,String等)和产品类型(案例类)。将来的版本将添加支持序列化其他类型的功能。 方法as的参数不足:(implicit evidence$1: org.apache.spark.sql.Encoder[String])org.apache.spark.sql.Dataset[String]。未指定值参数evidence$1。 - Vickyster
对于第二种方法--value split不是Any的成员。 - Vickyster
import spark.implicits._ 未找到:对象spark - Vickyster
1
抱歉,更改为sqlContext。我使用的是Spark 2.0的SparkSession。 - T. Gawęda

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接