在Apache Spark 1.3中向数据框添加列

54

是否可能并且有什么最有效的方法来添加一个列到数据框中?

更具体地说,该列可能作为现有数据框的行ID。

在简化的情况下,从文件中读取并不对其进行标记化,我可以考虑以下内容(在Scala中),但是它会在完成时出现错误(第3行),而且看起来也不像是最佳路线:

var dataDF = sc.textFile("path/file").toDF() 
val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID") 
dataDF = dataDF.withColumn("ID", rowDF("ID")) 

你找到解决方案了吗? - trianta2
哪些是错误?那似乎是API中正确的方法。 - Chet
1
@Chet,“withColumn”应该与同一数据框架一起使用——也就是说,您可以像这样做:dataDF = dataDF.withColumn("ID", dataDF("ID").map(...)) 只能使用“this”数据框架的列,而不能使用其他数据框架的列。 - Oleg Shirokikh
@OlegShirokikh 啊,嗯。如果那个限制被记录下来就好了。那就是个有趣的问题。除此之外,我从API中能看到的唯一机制就是使用join。这是一个不错的候选项,可以建议加强API。 - Chet
@Chet - 是的,这是任何数据框架的核心功能之一。例如,它们具有一些内置功能来更新 Parquet 文件的模式。显然,在分布式环境中,这是非常昂贵的操作,但无论如何,在我看来,它应该存在。 - Oleg Shirokikh
你试过使用 UDF 吗?(类似于sqlContext.udf().register("... 的东西) - Thomas Decaux
4个回答

53

我发布这个问题已经有一段时间了,似乎还有其他人也想得到答案。以下是我发现的内容。

所以原始任务就是将一个带有行标识符的列(基本上是序列1到numRows)附加到任何给定的数据框中,以便可以跟踪行的顺序/存在(例如在抽样时)。 可以通过类似以下方式实现:

sqlContext.textFile(file).
zipWithIndex().
map(case(d, i)=>i.toString + delimiter + d).
map(_.split(delimiter)).
map(s=>Row.fromSeq(s.toSeq))

关于向任何数据框添加任何列的一般情况:

Spark API中最接近此功能的是withColumnwithColumnRenamed。根据Scala文档,前者通过添加列返回新的数据框。在我看来,这个定义有点令人困惑且不完整。这两个函数只能操作this数据帧(即当前数据帧),即给定两个数据帧df1df2,以及列col

val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL

因此,除非您能够将现有数据框中的列转换为所需的形状,否则无法使用withColumnwithColumnRenamed来附加任意列(独立的或其他数据框)。

如上所述,解决方法可能是使用join,虽然这可能会很麻烦 - 使用zipWithIndex将唯一键附加到两个数据框或列,类似于上面的方法可能有效。虽然效率可能不高...

很明显,在分布式环境中添加列到数据框并不是一项容易的功能,并且可能没有非常高效和简洁的方法。但我认为,即使有性能警告,拥有这个核心功能仍然非常重要。


1
最近有一些支持row_number函数的倡议(其中之一)-SPARK-7712JIRA票号没有明确提到,所以你可能需要查看相关的拉取请求。-related pull-request - rchukh
2
很好的回答!如果Spark知道我在连接排序键,则追加列功能可以进行优化。这将大大提高性能。 - WeiChing 林煒清
2
如果您想添加一个id列,您应该查看函数monotonically_increasing_id(),它可以在withColumn内使用。 - Michael Armbrust
1
monotonically_increasing_id()存在深层次的脆弱性问题,如果要使用它,必须非常小心。https://dev59.com/OlsV5IYBdhLWcg3w2h_R#35706321 - Paul

31

不确定它是否适用于spark 1.3,但在spark 1.5中我使用withColumn:

import sqlContext.implicits._
import org.apache.spark.sql.functions._


df.withColumn("newName",lit("newValue"))

当我需要使用与数据框中现有列无关的值时,我会使用这个。

这类似于@NehaM的答案,但更简单。


6

我参考了上面的回答,但是发现如果我们想要改变一个DataFrame,并且当前的API在Spark 1.6中有些不同,它还是不够完整的。zipWithIndex()返回一个包含每行和相应索引的(Row, Long)元组。我们可以根据需要使用它来创建新的Row

val rdd = df.rdd.zipWithIndex()
             .map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq))
val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields))
sqlContext.createDataFrame(rdd, newstructure ).show

我希望这对你有所帮助,以下是关于IT技术的相关内容:


4
你可以使用以下窗口函数行号来获取数据框中每一行的唯一标识。请注意,保留HTML标签。
df.withColumn("ID", row_number() over Window.orderBy("any column name in the dataframe"))

您也可以使用monotonically_increasing_id来实现相同的功能。
df.withColumn("ID", monotonically_increasing_id())

还有一些其他方法


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接