Spark withColumn()函数执行幂函数。

6

我有一个数据框df,其中包含“col1”和“col2”两列。我想创建第三列,该列使用其中一列作为指数函数。

df = df.withColumn("col3", 100**(df("col1")))*df("col2")

然而,这总是导致以下错误:

TypeError: unsupported operand type(s) for ** or pow(): 'float' and 'Column'

我理解这是由于函数将df("col1")作为“Column”而不是该行的项目所导致的。
如果我执行
results = df.map(lambda x : 100**(df("col2"))*df("col2"))

这个方法是可行的,但我无法将其附加到原始数据框中。

有什么想法吗?

这是我第一次发布帖子,如果有任何格式问题,请见谅。

2个回答

14

从Spark 1.4开始,您可以使用以下方式使用pow函数:

from pyspark.sql import Row
from pyspark.sql.functions import pow, col

row = Row("col1", "col2")
df = sc.parallelize([row(1, 2), row(2, 3), row(3, 3)]).toDF()

df.select("*", pow(col("col1"), col("col2")).alias("pow")).show()

## +----+----+----+
## |col1|col2| pow|
## +----+----+----+
## |   1|   2| 1.0|
## |   2|   3| 8.0|
## |   3|   3|27.0|
## +----+----+----+

如果您使用旧版本的Python,那么Python UDF应该能解决问题:

import math
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

my_pow = udf(lambda x, y: math.pow(x, y), DoubleType())

1

补充一下已经得到认可的答案:现在可以做类似于OP尝试做的事情,即使用**运算符,甚至使用Python内置的pow函数:

from pyspark.sql import SparkSession
from pyspark.sql.functions import pow as pow_

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([(1, ), (2, ), (3, ), (4, ), (5, ), (6, )], 'n: int')

df = df.withColumn('pyspark_pow', pow_(df['n'], df['n'])) \
       .withColumn('python_pow', pow(df['n'], df['n'])) \
       .withColumn('double_star_operator', df['n'] ** df['n'])

df.show()

    +---+-----------+----------+--------------------+
    |  n|pyspark_pow|python_pow|double_star_operator|
    +---+-----------+----------+--------------------+
    |  1|        1.0|       1.0|                 1.0|
    |  2|        4.0|       4.0|                 4.0|
    |  3|       27.0|      27.0|                27.0|
    |  4|      256.0|     256.0|               256.0|
    |  5|     3125.0|    3125.0|              3125.0|
    |  6|    46656.0|   46656.0|             46656.0|
    +---+-----------+----------+--------------------+

可以看到,PySpark和Python的pow函数以及**运算符返回的结果相同。即使其中一个参数是标量值也能正常工作:

df = df.withColumn('pyspark_pow', pow_(2, df['n'])) \
       .withColumn('python_pow', pow(2, df['n'])) \
       .withColumn('double_star_operator', 2 ** df['n'])

df.show()
   
    +---+-----------+----------+--------------------+
    |  n|pyspark_pow|python_pow|double_star_operator|
    +---+-----------+----------+--------------------+
    |  1|        2.0|       2.0|                 2.0|
    |  2|        4.0|       4.0|                 4.0|
    |  3|        8.0|       8.0|                 8.0|
    |  4|       16.0|      16.0|                16.0|
    |  5|       32.0|      32.0|                32.0|
    |  6|       64.0|      64.0|                64.0|
    +---+-----------+----------+--------------------+
    

我认为Python的pow现在能够在PySpark列上工作的原因是,当只使用两个参数时,pow等同于**运算符(请参见文档here),而**运算符会使用对象自己的幂运算实现,如果该对象被操作时定义了幂运算实现(请参见此SO响应here)。
显然,PySpark的Column具有__pow__运算符的适当定义(请参见Columnsource code)。
我不确定为什么最初**运算符不起作用,但我认为这与Column最初的定义方式有关。
测试使用的堆栈是Python 3.8.5和PySpark 3.1.1,但我也看到了这种行为适用于PySpark >= 2.4。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接