使用withColumn无法使用max()函数生成新列

6

我有一个像这样的数据集:

a = sc.parallelize([[1,2,3],[0,2,1],[9,8,7]]).toDF(["one", "two", "three"])

我想要一个包含新列的数据集,该列的值等于其他三列中最大的值。 输出应如下所示:
+----+----+-----+-------+
|one |two |three|max_col|
+----+----+-----+-------+
|   1|   2|    3|      3|
|   0|   2|    1|      2|
|   9|   8|    7|      9|
+----+----+-----+-------+

我想使用withColumn,代码如下:

b = a.withColumn("max_col", max(a["one"], a["two"], a["three"]))

但是这会产生错误。
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/spark152/python/pyspark/sql/column.py", line 418, in __nonzero__
    raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

奇怪。 max 返回一个 bool 吗?根据 max 的文档,显然不是这样。好吧,很奇怪。

我觉得这很奇怪:

b = a.withColumn("max_col", a["one"] + a["two"] + a["three"]))

它能够工作的事实让我更加坚信 max 的行为某种程度上是我不理解的。

我还尝试了 b = a.withColumn("max_col", max([a["one"], a["two"], a["three"]])),它将三个列作为列表传递而不是三个单独的元素。这与上面的错误相同。

3个回答

14

实际上你需要的是 greatest 而不是 max

from pyspark.sql.functions import greatest

a.withColumn("max_col", greatest(a["one"], a["two"], a["three"]))

为了完整起见,您可以使用 least 来查找最小值:

from pyspark.sql.functions import least

a.withColumn("min_col", least(a["one"], a["two"], a["three"]))

关于您看到的错误,它非常简单。max依赖于丰富的比较。当您比较两个列时,会得到一个Column

type(col("a") < col("b")
## pyspark.sql.column.Column

PySpark明确禁止将列转换为布尔值(您可以查看Column.__nonzero__源代码),因为这是没有意义的。它只是一个逻辑表达式,无法在驱动程序上下文中进行评估。


1
如果我理解正确,您将列的最大值和行的最大值弄混了。实际上,.withColumn 需要接收一个列,而您需要进行行操作。
b=a.map(lambda row: (row.one, row.two, row.three, max(row)))
英译中:

b然后是一个RDD,您可以将其转换为数据框

(保留HTML标记)
b.toDF('one','two','three','max')

1

你不能使用Python中的max函数,因为它无法返回预期的pyspark.sql.Column。一个pyspark DataFrame函数的例子是array,它可以从几列构建一个列表,注意返回值:

http://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html#array

为了实现你所需的功能,你可以编写一个用户自定义函数,如下(未经测试):
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def my_max(*cols):
    return max(cols)

udf_my_max = udf(my_max, IntegerType)

df.withColumn('max_col', udf_my_max(a.columns))

1
很遗憾,这对我没用。毕竟你没有机会测试,这可能只是一个小问题/错误。我更喜欢使用DataFrames而不是RDDs,所以如果您找到可行的解决方案,我将不胜感激! - Katya Willard

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接