PySpark:使用两个条件和三种结果的withColumn()函数

67

我正在使用Spark和PySpark。我试图实现与以下伪代码等效的结果:

df = df.withColumn('new_column', 
    IF fruit1 == fruit2 THEN 1, ELSE 0. IF fruit1 IS NULL OR fruit2 IS NULL 3.)

我正在尝试在PySpark中实现这个,但我对语法不确定。有什么指针吗?我研究了 expr() 但无法使其正常工作。

请注意,df 是一个 pyspark.sql.dataframe.DataFrame

3个回答

110

有几种有效的方法可以实现这个。让我们从必需的导入开始:

from pyspark.sql.functions import col, expr, when

您可以在表达式中使用Hive IF函数:

new_column_1 = expr(
    """IF(fruit1 IS NULL OR fruit2 IS NULL, 3, IF(fruit1 = fruit2, 1, 0))"""
)

或者when + otherwise

new_column_2 = when(
    col("fruit1").isNull() | col("fruit2").isNull(), 3
).when(col("fruit1") == col("fruit2"), 1).otherwise(0)

最后,您可以使用以下技巧:

from pyspark.sql.functions import coalesce, lit

new_column_3 = coalesce((col("fruit1") == col("fruit2")).cast("int"), lit(3))

使用示例数据:

df = sc.parallelize([
    ("orange", "apple"), ("kiwi", None), (None, "banana"), 
    ("mango", "mango"), (None, None)
]).toDF(["fruit1", "fruit2"])

您可以按以下方式使用:

(df
    .withColumn("new_column_1", new_column_1)
    .withColumn("new_column_2", new_column_2)
    .withColumn("new_column_3", new_column_3))

结果是:

+------+------+------------+------------+------------+
|fruit1|fruit2|new_column_1|new_column_2|new_column_3|
+------+------+------------+------------+------------+
|orange| apple|           0|           0|           0|
|  kiwi|  null|           3|           3|           3|
|  null|banana|           3|           3|           3|
| mango| mango|           1|           1|           1|
|  null|  null|           3|           3|           3|
+------+------+------------+------------+------------+

3
在 Spark 2.2+ 版本中,函数 'col' 对我来说不起作用。直接使用列名而不加引号即可。例如:new_column_1 = expr(" col_1 + int(col_2/15) ") - smishra

31

您需要按照以下方式使用UDF:

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def func(fruit1, fruit2):
    if fruit1 == None or fruit2 == None:
        return 3
    if fruit1 == fruit2:
        return 1
    return 0

func_udf = udf(func, IntegerType())
df = df.withColumn('new_column',func_udf(df['fruit1'], df['fruit2']))

1
我从这个解决方案中得到了一些错误,@David。第一个错误通过from pyspark.sql.types import StringType解决了。第二个错误是:TypeError: 'int' object is not callable,我不确定如何解决。请注意,df是一个pyspark.sql.dataframe.DataFrame - user2205916
1
@user2205916,我有几个错别字。在def func(...这一行中,我写成了fruit 1(有一个空格),而不是fruit1。在以func_udf =...开头的那一行中,我写成了StringType,而不是IntegerType。请使用更新后的代码尝试一下,如果仍然有问题,请告诉我。 - David
我收到了相同的错误信息。此外,我认为在“df = ...”的末尾缺少一个括号。 - user2205916
糟糕,又出现了一个打字错误,倒数第二行应该是 func_udf = udf(func, IntegerType()) - David
得赶快运行,但这已经很接近了(错别字除外)。如果仍然无法正常工作,请确保您没有像这样的情况:https://dev59.com/6mkw5IYBdhLWcg3wm70h - David

19

pyspark中的withColumn函数使您能够根据条件创建新变量,加入when和otherwise函数,就可以拥有一个正常工作的if then else结构。

为了使用这些功能,您需要导入sparksql函数,因为如果没有col()函数,以下代码将无法工作。

在第一部分中,我们声明了一个新列-'new column',然后将条件放在when函数中(即fruit1==fruit2),如果条件成立,则返回1,否则流程进入otherwise函数,使用isNull()函数处理第二个条件(fruit1或fruit2为空),如果是true,则返回3,否则再次检查otherwise并返回0作为答案。

from pyspark.sql import functions as F

df=df.withColumn('new_column', 
    F.when(F.col('fruit1')==F.col('fruit2'), 1)
    .otherwise(F.when((F.col('fruit1').isNull()) | (F.col('fruit2').isNull()), 3))
    .otherwise(0))

你能否解释一下你的代码?这样新手也能理解你做了什么。 - sayalok
2
@Nidhi,如果fruit1fruit2来自不同的数据框,是否可以执行类似的操作? - jgtrz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接