在映射中创建Spark行

3
我看到了一篇关于Dataframes的教程,链接为https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html,该教程使用Python编写。我正在尝试将其翻译成Scala。
他们有以下代码:
df = context.load("/path/to/people.json")
# RDD-style methods such as map, flatMap are available on DataFrames
# Split the bio text into multiple words.
words = df.select("bio").flatMap(lambda row: row.bio.split(" "))
# Create a new DataFrame to count the number of words
words_df = words.map(lambda w: Row(word=w, cnt=1)).toDF()
word_counts = words_df.groupBy("word").sum()

所以,我首先从一个 csv 文件中读取数据到一个名为 df 的数据框中,然后我有以下代码:
val title_words = df.select("title").flatMap { row =>    
  row.getAs[String("title").split(" ") }
val title_words_df = title_words.map( w => Row(w,1) ).toDF()
val word_counts = title_words_df.groupBy("word").sum()

但我不知道:

  1. 如何将字段名称分配给以val title_words_df = ...开头的行中的行

  2. 我遇到了错误“值toDF不是org.apache.spark.rdd.RDD [org.apache.spark.sql.Row]的成员”

提前感谢您的帮助。

1个回答

2
如何将字段名称分配给行
Python的Row与其Scala对应对象有很大的不同。它是一个带有名称的元组,使其更等同于产品类型而不是无类型集合(o.a.s.sql.Row)。
我遇到了错误:“值toDF不是org.apache.spark.rdd.RDD [org.apache.spark.sql.Row]的成员”。
由于o.a.s.sql.Row基本上是无类型的,因此不能与toDF一起使用,并且需要使用具有显式模式的createDataFrame。
import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("word", StringType), StructField("cnt", LongType)
))

sqlContext.createDataFrame(title_words.map(w => Row(w, 1L)), schema)

如果您希望您的代码与Python版本等效,那么应该使用产品类型而不是Row。这意味着使用一个Tuple

title_words.map((_, 1L)).toDF("word", "cnt")

或者情况类:
case class Record(word: String, cnt: Long)

title_words.map(Record(_, 1L)).toDF

但实际上,使用RDDs是没有必要的:

import org.apache.spark.sql.functions.{explode, lit, split}

df.select(explode(split($"title", " ")), lit(1L))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接