将RDD[String]转换为RDD[Row]以便于在Spark Scala中创建Dataframe

3

我正在读取一个包含许多空格的文件,并需要过滤掉这些空格。然后,我们需要将其转换为数据框。以下是示例输入。

2017123 ¦     ¦10¦running¦00000¦111¦-EXAMPLE

我解决这个问题的方法是编写以下函数,它将所有空格解析并修剪文件。

def truncateRDD(fileName : String): RDD[String] = {
    val example = sc.textFile(fileName)
    example.map(lines => lines.replaceAll("""[\t\p{Zs}]+""", ""))
}

然而,我不确定如何将其转换成数据帧。 sc.textFile返回一个RDD[String]。我尝试了使用案例类,但问题在于我们有800个字段模式,案例类不能超过22个。
我在考虑将RDD [String]转换为RDD [Row],以便可以使用createDataFrame 函数。
val DF = spark.createDataFrame(rowRDD, schema)

有没有关于如何做到这一点的建议?
2个回答

8
首先将你的字符串分割 / 解析为字段。
使用 rdd.map( line => parse(line)),其中 parse 是某个解析函数。它可以是简单的 split,但你可能需要更健壮的方法。这将为您获取一个 RDD [Array[String]] 或类似结构的对象。
然后,您可以使用 rdd.map(a => Row.fromSeq(a)) 将其转换为 RDD[Row]
从那里,您可以使用 sqlContext.createDataFrame(rdd, schema) 将其转换为 DataFrame,其中 rdd 是您的 RDD [Row],schema 是您的模式 StructType。

我有一个嵌套的JSON数组需要解析,如何将其转换为数据框? - Nirmal_stack

0
在您的情况下,简单的方法如下:

val RowOfRDD = truncateRDD("yourfilename").map(r => Row.fromSeq(r))

如何解决使用scala 2.10时的productarity问题?

然而,我不确定如何将其转换为数据框。 sc.textFile返回一个RDD[String]。我尝试过使用case class的方法,但问题是我们有800个字段模式,case class无法超过22个。

是的,确实存在一些限制,例如productarity,但我们可以克服...您可以像下面的示例一样针对版本<2.11执行:

准备一个继承自Product并覆盖方法的case class。

像这样...

  • productArity():Int: 此函数返回属性的数量。在我们的情况下,它是33。因此,我们的实现如下:

  • productElement(n:Int):Any: 给定一个索引,此函数返回属性。为了保护,我们还有一个默认情况,会抛出一个IndexOutOfBoundsException异常:

  • canEqual (that:Any):Boolean: 这是三个函数中的最后一个,当进行类的相等性检查时,它作为边界条件:



从Scala 2.11版本开始,不再存在arity问题。在以下版本的Scala中,上述方法是适用的。 - Ram Ghadiyaram

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接