如何在 PySpark DataFrame 中替换 infinity

8

看起来没有支持替换无限值的功能。我尝试了下面的代码,但它不起作用。还是我漏掉了什么?

a=sqlContext.createDataFrame([(None, None), (1, np.inf), (None, 2)])
a.replace(np.inf, 10)

我需要做的是走一条痛苦的路吗:将PySpark DataFrame转换为pandas DataFrame,替换无限值,然后再将其转回PySpark DataFrame?

1个回答

13

似乎没有支持替换无穷大值的功能。

实际上看起来像是 Py4J 的一个 Bug,而不是 replace 本身的问题。请参见Support nan/inf between Python and Java

作为一种解决方法,您可以尝试使用 UDF(较慢的选项):

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, lit, udf, when

df = sc.parallelize([(None, None), (1.0, np.inf), (None, 2.0)]).toDF(["x", "y"])

replace_infs_udf = udf(
    lambda x, v: float(v) if x and np.isinf(x) else x, DoubleType()
)

df.withColumn("x1", replace_infs_udf(col("y"), lit(-99.0))).show()

## +----+--------+-----+
## |   x|       y|   x1|
## +----+--------+-----+
## |null|    null| null|
## | 1.0|Infinity|-99.0|
## |null|     2.0|  2.0|
## +----+--------+-----+

或者像这样的表达式:

def replace_infs(c, v):
    is_infinite = c.isin([
        lit("+Infinity").cast("double"),
        lit("-Infinity").cast("double")
    ])
    return when(c.isNotNull() & is_infinite, v).otherwise(c)

df.withColumn("x1", replace_infs(col("y"), lit(-99))).show()

## +----+--------+-----+
## |   x|       y|   x1|
## +----+--------+-----+
## |null|    null| null|
## | 1.0|Infinity|-99.0|
## |null|     2.0|  2.0|
## +----+--------+-----+

@AlbertoBonsanto 因为DataFrame不是Python对象,所以需要完整的往返处理。 - zero323
@AlbertoBonsanto 另一个方面,这与 PySpark 无关,那就是 UDF 对于优化器来说只是一个黑盒子。这里不应该有影响,但一般来说,这意味着您无法推理需要 UDF 的操作。最后,据我所知,内部表示不使用标准的 Scala 类型。因此,即使在 Scala 或 Java 中,您也可以直接使用表达式而不是 UDF。 - zero323
谢谢,问题在于有时我很难想出如何使用表达式而不是“UDF”来实现这样的事情。我之所以问是因为我有一段代码,它使用“UDFs”将字母数组转换为“SparseVector”,但代码从未完成。 - Alberto Bonsanto
为什么DataFrame的最后一行包含(null, null)而不是(null, 2) - Kevin Ghaboosi
1
@KevinGhaboosi 这是由于类型不匹配引起的。Spark 不认为 Python 整数是 double / float 列的有效值。已经修复了(感谢您的编辑!)。 - zero323
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接