如何在Spark SQL的DataFrame中更改列类型?

181

假设我正在做这样的事情:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment
1997 Ford  E350  Go get one now th...

但是我真的希望 year 是一个 Int(并且可能转换其他列)。

我能想到的最好方法是:

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

这有些复杂。

我的背景是R语言,我习惯于能够编写例如:

df2 <- df %>%
   mutate(year = year %>% as.integer,
          make = make %>% toupper)

我可能漏掉了一些东西,因为在Spark/Scala中应该有更好的方法来实现这个...


1
我喜欢这种方式 spark.sql("SELECT STRING(NULLIF(column,'')) as column_string") - Eric Bellet
23个回答

155

编辑:最新版本

自 Spark 2.x 起在使用 Scala 时应改用 Dataset API。请在此处查看文档:

https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#withColumn(colName:String,col:org.apache.spark.sql.Column):org.apache.spark.sql.DataFrame

如果使用 Python,虽然更容易,但我还是留下链接,因为它是一个非常受欢迎的问题:

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html

>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name='Alice', age2=4), Row(age=5, name='Bob', age2=7)]

[1] https://spark.apache.org/docs/latest/sql-programming-guide.html:

在Scala API中,DataFrame只是Dataset [Row]的类型别名。而在Java API中,用户需要使用Dataset来表示DataFrame。

编辑:最新版本

从Spark 2.x开始,您可以使用.withColumn。在此处检查文档:

https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset@withColumn(colName:String,col:org.apache.spark.sql.Column):org.apache.spark.sql.DataFrame

最早的答案

从Spark 1.4版本开始,您可以在列上应用具有DataType的cast方法:

import org.apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
    .drop("year")
    .withColumnRenamed("yearTmp", "year")

如果你正在使用SQL表达式,你也可以这样做:

val df2 = df.selectExpr("cast(year as int) year", 
                        "make", 
                        "model", 
                        "comment", 
                        "blank")

查看文档获取更多信息: http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame


4
为什么你要用withColumn,接着又drop掉?直接使用原始列名进行withColumn不是更简单吗? - Ameba Spugnosa
5
不需要删除列再进行重命名,您可以在一行代码中完成操作:df.withColumn("ctr", temp("ctr").cast(DecimalType(decimalPrecision, decimalScale)))。此代码将更改列“ctr”的数据类型并将其添加回数据框中。 - ruhong
1
在这种情况下,是否会创建一个完整的新数据框副本来重新转换列?我有什么遗漏的吗?或者也许幕后有一些优化? - user1814008
1
@user1814008 也许你想查看 https://dev59.com/1F0a5IYBdhLWcg3wHlil#30691654 。在那里,你可以找到关于Spark转换和操作如何工作以及为什么应用转换不一定会创建一个新的数据框的深入解释。 - msemelman
6
根据 Spark 2.x文档df.withColumn(..) 方法可以根据传入的 colName 参数,添加或替换一列。 - y2k-shubham
显示剩余5条评论

91
< p > [EDIT:2016年3月:感谢您的投票!尽管如此,我认为基于 withColumn,withColumnRenamed 和 cast 的解决方案(由 msemelman,Martin Senne 和其他人提出)更简单,更清洁。]

我认为你的方法还可以,记住 Spark DataFrame 是一组行的(不可变)RDD,所以我们从来没有真正地替换列,只是每次创建一个具有新架构的新 DataFrame。

假设您拥有原始 df,其模式如下:

scala> df.printSchema
root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)

还有一些针对一个或多个列定义的UDF:

import org.apache.spark.sql.functions._

val toInt    = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) 
val days_since_nearest_holidays = udf( 
  (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
 )

更改列类型或甚至从另一个DataFrame构建新的DataFrame可以像这样编写:

val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour",  toHour(df("CRSDepTime")))
.withColumn("dayOfWeek",      toInt(df("DayOfWeek")))              
.withColumn("dayOfMonth",     toInt(df("DayofMonth")))              
.withColumn("month",          toInt(df("Month")))              
.withColumn("distance",       toDouble(df("Distance")))              
.withColumn("nearestHoliday", days_since_nearest_holidays(
              df("Year"), df("Month"), df("DayofMonth"))
            )              
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", 
        "month", "distance", "nearestHoliday")            

这将产生:

scala> df.printSchema
root
 |-- departureDelay: double (nullable = true)
 |-- departureHour: integer (nullable = true)
 |-- dayOfWeek: integer (nullable = true)
 |-- dayOfMonth: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- nearestHoliday: integer (nullable = true)

这与您自己的解决方案非常接近。只需将类型更改和其他转换保持为单独的udf val,可以使代码更易读和可重复使用。


32
这既不安全也不高效。对于安全性,单个 NULL 或格式错误的条目将导致整个任务崩溃。对于效率而言,UDF 对 Catalyst 来说并不透明。对于复杂操作使用 UDF 是可以的,但没有理由使用它们来进行基本类型转换。这就是为什么我们有 cast 方法 (请参见 Martin Senne 的 答案)。让事情对 Catalyst 透明需要更多的工作,但基本安全只是让 TryOption 起作用的问题。 - zero323
我没有看到任何关于将字符串转换为日期的相关内容,例如“05-APR-2015”。 - dbspace
4
有没有一种方式可以将您的withColumn()部分简化为通用部分,以遍历所有列? - Boern
感谢zero323,阅读此内容后我明白了为什么这里的UDF解决方案会崩溃。有些评论比一些SO上的答案更好 :) - user1972382
有没有办法可以知道哪些行是损坏的,也就是在转换过程中具有错误数据类型列的记录。因为转换函数会将这些字段设置为 null。 - Etisha

71

由于Spark Column支持cast操作(且我个人目前不赞成@Svend提出的udf),那么怎么样:

df.select( df("year").cast(IntegerType).as("year"), ... )

如何将值转化为请求的类型?一个好的副作用是,在这个意义上不可转换的值将变为null

如果需要将其作为辅助方法使用,请使用:

object DFHelper{
  def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
    df.withColumn( cn, df(cn).cast(tpe) )
  }
}

被使用的方式如下:

import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )

2
你能给我建议吗?如果我需要对一堆列进行类型转换和重命名(我有50列,而且对Scala比较新,不确定最好的方法是什么,以避免创建大量的重复代码),该怎么做呢?有些列应该保持为字符串类型,有些则应该转换为浮点数。 - Dmitry Smirnov
如何将一个字符串转换为日期,例如在列中的"25-APR-2016"和"20160302"。 - dbspace
@DmitrySmirnov 你有收到答案吗?我也有同样的问题。 ;) - Evan Zamir
很遗憾 @EvanZamir,我最终还是得进行了大量操作才能在其他步骤中将数据用作 RDD。不知道现在是否变得更容易了 :) - Dmitry Smirnov

64

首先,如果您想进行类型转换,那么使用以下代码:

import org.apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))

如果列名相同,则该列将被替换为新列。您不需要执行添加和删除步骤。

其次,关于ScalaR的比较。这是我能想到最接近R的代码:

val df2 = df.select(
   df.columns.map {
     case year @ "year" => df(year).cast(IntegerType).as(year)
     case make @ "make" => functions.upper(df(make)).as(make)
     case other         => df(other)
   }: _*
)

虽然代码长度比R语言稍长,但这与语言的冗长无关。在R语言中,mutate是专门针对数据框的特殊函数,而在Scala中,由于其表达能力强大,您可以轻松地临时添加一个函数。
换句话说,它避免了使用特定的解决方案,因为语言设计足够好,可以快速轻松地构建自己的领域语言。


附注:df.columns出人意料地是Array[String]而不是Array[Column],也许他们想让它看起来像Python pandas的数据帧。


1
请问您能否提供pyspark的等效版本? - Harit Vishwakarma
1
我的“age”字段使用.withColumn(“age”,$“age”。cast(sql.types.DoubleType))出现“定义非法启动”。有什么建议吗? - BlueDolphin
如果我们要对许多列进行这些转换以提高性能,那么是否需要对数据框进行.cache()操作,或者因为Spark会优化它们,所以不需要呢? - skjagini
导入可以是 import org.apache.spark.sql.types._,然后不需要写 sql.types.IntegerType,只需写 IntegerType - nessa.gp

20

你可以使用 selectExpr 来使它更加简洁:

df.selectExpr("cast(year as int) as year", "upper(make) as make",
    "model", "comment", "blank")

13

将DataFrame的数据类型从字符串修改为整数的Java代码

df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))
它将简单地将现有的(String数据类型)转换为Integer。

1
sql.types 中没有 DataTypes!应该是 DataType。此外,可以直接导入 IntegerType 并进行类型转换。 - Ehsan M. Kermani
@EhsanM.Kermani 实际上 DatyaTypes.IntegerType 是一个合法的引用。 - Cupitor
1
@Cupitor DataTypes.IntegerType 曾经处于DeveloperAPI模式,现在已经稳定在v.2.1.0版本 - Ehsan M. Kermani
这是最佳解决方案! - user1972382

11

我认为这对我来说更易读。

import org.apache.spark.sql.types._
df.withColumn("year", df("year").cast(IntegerType))

这将把您的年份列转换为 IntegerType,而不创建任何临时列并删除那些列。 如果您想要转换为其他任何数据类型,可以在 org.apache.spark.sql.types 包内检查类型。


8
为了将年份从字符串转换为整数,您可以向csv reader添加以下选项:“inferSchema” ->“true”,参见DataBricks文档。请注意保留HTML标记。

5
这个方法很不错,但问题在于读者必须再次阅读文件。 - beefyhalo
@beefyhalo 绝对正确,有什么解决办法吗? - Ayush

8

生成包含五个值的简单数据集并将 int 类型转换为 string 类型:

val df = spark.range(5).select( col("id").cast("string") )

6

如果您遇到像SQL Server这样的JDBC驱动程序保存问题,那么此方法才能起作用,但对于语法和类型错误,这确实非常有帮助。

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
    case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
    case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
    case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
    case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
    case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
    case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
    case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
    //      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
    case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
    case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
  }
}

JdbcDialects.registerDialect(SQLServerDialect)

你能帮我在Java中实现相同的代码吗?以及如何将customJdbcDialect注册到DataFrame中。 - abhijitcaps
不错,我也用Vertica做了同样的事情,但是自从Spark 2.1以来,你只需要实现你需要的特定数据类型就可以了。dialect.getJDBCType(dt).orElse(getCommonJDBCType(dt)).getOrElse( throw new IllegalArgumentException(s"无法获取${dt.simpleString}的JDBC类型"))。 - Arnon Rodman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接