看起来没有支持替换无限值的功能。我尝试了下面的代码,但它不起作用。还是我漏掉了什么?
a=sqlContext.createDataFrame([(None, None), (1, np.inf), (None, 2)])
a.replace(np.inf, 10)
我需要做的是走一条痛苦的路吗:将PySpark DataFrame转换为pandas DataFrame,替换无限值,然后再将其转回PySpark DataFrame?
看起来没有支持替换无限值的功能。我尝试了下面的代码,但它不起作用。还是我漏掉了什么?
a=sqlContext.createDataFrame([(None, None), (1, np.inf), (None, 2)])
a.replace(np.inf, 10)
我需要做的是走一条痛苦的路吗:将PySpark DataFrame转换为pandas DataFrame,替换无限值,然后再将其转回PySpark DataFrame?
似乎没有支持替换无穷大值的功能。
实际上看起来像是 Py4J 的一个 Bug,而不是 replace
本身的问题。请参见Support nan/inf between Python and Java。
作为一种解决方法,您可以尝试使用 UDF(较慢的选项):
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, lit, udf, when
df = sc.parallelize([(None, None), (1.0, np.inf), (None, 2.0)]).toDF(["x", "y"])
replace_infs_udf = udf(
lambda x, v: float(v) if x and np.isinf(x) else x, DoubleType()
)
df.withColumn("x1", replace_infs_udf(col("y"), lit(-99.0))).show()
## +----+--------+-----+
## | x| y| x1|
## +----+--------+-----+
## |null| null| null|
## | 1.0|Infinity|-99.0|
## |null| 2.0| 2.0|
## +----+--------+-----+
或者像这样的表达式:
def replace_infs(c, v):
is_infinite = c.isin([
lit("+Infinity").cast("double"),
lit("-Infinity").cast("double")
])
return when(c.isNotNull() & is_infinite, v).otherwise(c)
df.withColumn("x1", replace_infs(col("y"), lit(-99))).show()
## +----+--------+-----+
## | x| y| x1|
## +----+--------+-----+
## |null| null| null|
## | 1.0|Infinity|-99.0|
## |null| 2.0| 2.0|
## +----+--------+-----+
DataFrame
不是Python对象,所以需要完整的往返处理。 - zero323(null, null)
而不是(null, 2)
? - Kevin Ghaboosi