如何在Spark SQL的DataFrame中更改列类型?

181

假设我正在做这样的事情:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment
1997 Ford  E350  Go get one now th...

但是我真的希望 year 是一个 Int(并且可能转换其他列)。

我能想到的最好方法是:

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

这有些复杂。

我的背景是R语言,我习惯于能够编写例如:

df2 <- df %>%
   mutate(year = year %>% as.integer,
          make = make %>% toupper)

我可能漏掉了一些东西,因为在Spark/Scala中应该有更好的方法来实现这个...


1
我喜欢这种方式 spark.sql("SELECT STRING(NULLIF(column,'')) as column_string") - Eric Bellet
23个回答

5

建议使用cast方法,但需要说明的是,在Spark 1.4.1中,cast方法存在问题。

例如,一个包含字符串列且值为“8182175552014127960”的数据框,在转换为bigint后的值为“8182175552014128100”。

    df.show
+-------------------+
|                  a|
+-------------------+
|8182175552014127960|
+-------------------+

    df.selectExpr("cast(a as bigint) a").show
+-------------------+
|                  a|
+-------------------+
|8182175552014128100|
+-------------------+

在生产环境中,我们使用了bigint列,这导致在找到此错误之前我们遇到了很多问题。


4
嘘,升级你的Spark。 - msemelman
2
@msemelman,因为一个小错误就必须在生产环境中升级到新版本的Spark,这太荒谬了。 - sauraI3h
我们难道不是总是为了小问题升级一切吗? :) - caesarsol

5
df.select($"long_col".cast(IntegerType).as("int_col"))

4
您可以使用以下代码。
df.withColumn("year", df("year").cast(IntegerType))

这将把年份列转换为整数类型列。


4

使用Spark Sql 2.4.0,您可以做到这一点:

spark.sql("SELECT STRING(NULLIF(column,'')) as column_string")

2
另一个解决方案如下:
1)将“inferSchema”保持为False 2)在对行运行“Map”函数时,您可以读取“asString”(row.getString...)。
//Read CSV and create dataset
Dataset<Row> enginesDataSet = sparkSession
            .read()
            .format("com.databricks.spark.csv")
            .option("header", "true")
            .option("inferSchema","false")
            .load(args[0]);

JavaRDD<Box> vertices = enginesDataSet
            .select("BOX","BOX_CD")
            .toJavaRDD()
            .map(new Function<Row, Box>() {
                @Override
                public Box call(Row row) throws Exception {
                    return new Box((String)row.getString(0),(String)row.get(1));
                }
            });

2

2
很多答案,但没有很详细的解释。
以下语法适用于使用Spark 2.4的Databricks笔记本。
from pyspark.sql.functions import *
df = df.withColumn("COL_NAME", to_date(BLDFm["LOAD_DATE"], "MM-dd-yyyy"))

请注意,您必须指定您拥有的条目格式(在我的情况下为“MM-dd-yyyy”),并且导入是强制性的,因为to_date是一个spark sql函数。
还尝试了这个语法,但得到的是null而不是正确的转换:
df = df.withColumn("COL_NAME", df["COL_NAME"].cast("Date"))

(请注意,为了使语法正确,我不得不使用括号和引号)


PS:我必须承认这就像是一个语法丛林,有许多可能的入口点,而官方API参考缺乏适当的示例。


1
语法丛林。是的。这正是现在Spark的世界。 - conner.xyz

2

这种方法将删除旧列并创建具有相同值和新数据类型的新列。当创建DataFrame时,我的原始数据类型为:

root
 |-- id: integer (nullable = true)
 |-- flag1: string (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag3: string (nullable = true)

接下来我运行以下代码以更改数据类型:

df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)

在此之后,我的结果如下:-
root
 |-- id: integer (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag1: boolean (nullable = true)
 |-- flag3: boolean (nullable = true)

请在这里提供您的解决方案。 - Ajay Kharade

1
使用Spark SQL中的cast可以更改列的数据类型。 表名为table,只有两列column1和column2,需要更改column1的数据类型。 例如:spark.sql("select cast(column1 as Double) column1NewName,column2 from table") 在Double的位置上写入您想要的数据类型。

1
另一种方法:

// Generate a simple dataset containing five values and convert int to string type

val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接