如何在PySpark中将数据框的字符串类型列更改为双精度类型?

157

我有一个列为字符串的数据框。 我想在PySpark中将列类型更改为Double类型。

以下是我所做的方法:

toDoublefunc = UserDefinedFunction(lambda x: x,DoubleType())
changedTypedf = joindf.withColumn("label",toDoublefunc(joindf['show']))

我只是想知道,这样做是否正确,因为在运行逻辑回归时,我遇到了一些错误,所以我想知道,这是问题的原因吗。

7个回答

270
这里不需要使用UDF。 Column 已经提供了 cast 方法,其带有 DataType 实例
from pyspark.sql.types import DoubleType

changedTypedf = joindf.withColumn("label", joindf["show"].cast(DoubleType()))

或短字符串:

changedTypedf = joindf.withColumn("label", joindf["show"].cast("double"))

规范字符串名称(也可以支持其他变体)对应于simpleString值。因此,对于原子类型:

from pyspark.sql import types 

for t in ['BinaryType', 'BooleanType', 'ByteType', 'DateType', 
          'DecimalType', 'DoubleType', 'FloatType', 'IntegerType', 
           'LongType', 'ShortType', 'StringType', 'TimestampType']:
    print(f"{t}: {getattr(types, t)().simpleString()}")

BinaryType: binary
BooleanType: boolean
ByteType: tinyint
DateType: date
DecimalType: decimal(10,0)
DoubleType: double
FloatType: float
IntegerType: int
LongType: bigint
ShortType: smallint
StringType: string
TimestampType: timestamp

比如说复杂类型

types.ArrayType(types.IntegerType()).simpleString()   

'array<int>'

types.MapType(types.StringType(), types.IntegerType()).simpleString()

'map<string,int>'

8
使用col函数同样有效。 from pyspark.sql.functions import colchangedTypedf = joindf.withColumn("label", col("show").cast(DoubleType())) - Staza
cast()参数(“string”语法)的可能值是什么? - Wirawan Purwanto
1
我简直不敢相信Spark文档在数据类型的有效字符串方面是如此简洁。我能找到的最接近的参考资料是这个:https://docs.tibco.com/pub/sfire-analyst/7.7.1/doc/html/en-US/TIB_sfire-analyst_UsersGuide/connectors/apache-spark/apache_spark_data_types.htm。 - Wirawan Purwanto
1
如何一次性转换多列? - hui chen
如何将可空类型更改为非可空类型? - pitchblack408

81

通过使用与输入列相同的名称,保留列名并避免添加额外的列:

from pyspark.sql.types import DoubleType
changedTypedf = joindf.withColumn("show", joindf["show"].cast(DoubleType()))

5
谢谢,我正在寻找如何保留原始列名的方法。 - WestCoastProjects
Spark会识别哪些短字符串数据类型?是否有一个列表可以查看? - alfredox
1
这个解决方案在循环中也非常出色,例如:from pyspark.sql.types import IntegerType for ftr in ftr_list: df = df.withColumn(f, df[f].cast(IntegerType())) - Quetzalcoatl
@Quetzalcoatl,你的代码有误。f是什么?你在哪里使用了ftr - Sheldore
是的,谢谢 -- 'f' 应该是 'ftr'。其他人可能已经想到了。 - Quetzalcoatl

16

给出的答案足以解决问题,但我想分享另一种方法,可能会在 Spark 的新版本中引入(我不确定),因此给出的答案没有涵盖它。

我们可以使用col("colum_name")关键字在 Spark 语句中访问列:

from pyspark.sql.functions import col
changedTypedf = joindf.withColumn("show", col("show").cast("double"))

谢谢!使用 'double'DoubleType() 更优雅,后者可能还需要导入。 - ZygD

7

PySpark版本:

df = <source data>
df.printSchema()

from pyspark.sql.types import *

# Change column type
df_new = df.withColumn("myColumn", df["myColumn"].cast(IntegerType()))
df_new.printSchema()
df_new.select("myColumn").show()

1
答案很简单 -
toDoublefunc = UserDefinedFunction(lambda x: float(x),DoubleType())
changedTypedf = joindf.withColumn("label",toDoublefunc(joindf['show']))

1

使用:

df1.select(col('show').cast("Float").alias('label')).show()

或者

df1.selectExpr("cast(show AS FLOAT) as label").show()

0

其他答案存在一个问题(取决于您的Pyspark版本),即使用withColumn。至少在v2.4.4中观察到了性能问题(请参见此thread)。spark docs关于withColumn的说明如下:

此方法在内部引入了投影。因此,多次调用它,例如通过循环添加多个列,可能会生成大型计划,从而导致性能问题甚至StackOverflowException。为避免这种情况,请一次使用多个列进行选择。

通常实现建议使用select的一种方法是:

from pyspark.sql.types import *
from pyspark.sql import functions as F

cols_to_fix = ['show']
other_cols = [col for col in joindf.columns if not col in cols_to_fix]
joindf = joindf.select(
    *other_cols,
    F.col('show').cast(DoubleType())
)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接