在PySpark中,将自定义函数应用于数据帧的选定列的单元格

9
假设我有一个数据框,看起来像这样:
+---+-----------+-----------+
| id|   address1|   address2|
+---+-----------+-----------+
|  1|address 1.1|address 1.2|
|  2|address 2.1|address 2.2|
+---+-----------+-----------+

我希望直接将自定义函数应用于“address1”和“address2”列中的字符串,例如:

def example(string1, string2):
    name_1 = string1.lower().split(' ')
    name_2 = string2.lower().split(' ')
    intersection_count = len(set(name_1) & set(name_2))

    return intersection_count

我希望将结果存储在一个新的列中,这样我的最终数据框将如下所示:
+---+-----------+-----------+------+
| id|   address1|   address2|result|
+---+-----------+-----------+------+
|  1|address 1.1|address 1.2|     2|
|  2|address 2.1|address 2.2|     7|
+---+-----------+-----------+------+

我曾试图将内置函数应用于整个列,以相同方式执行此操作,但出现了错误:

>>> df.withColumn('result', example(df.address1, df.address2))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 2, in example
TypeError: 'Column' object is not callable

我到底做错了什么,如何在选定的列中应用自定义函数于字符串?

2个回答

10

您需要在Spark中使用自定义函数(UDF)

from pyspark.sql.functions import udf
example_udf = udf(example, LongType())
df.withColumn('result', example_udf(df.address1, df.address2))

1
谢谢!我只有一个问题:为什么我们特别使用LongType()?这与给定函数返回的数据类型有关吗? - Angie
2
是的,根据给定函数,这应该是返回类型。 - dumitru
1
我可以传递额外的参数吗?这些参数不是来自数据框。例如,如果我想定义一个示例:def example(source_name1, source_name2, string1, string2): return json.dumps({source_name1: string1, source_name2: string2}) - Angie
1
是的,您可以传递额外的参数。 - dumitru
1
但是当我尝试传递一个字符串时,我得到了以下错误:pyspark.sql.utils.AnalysisException: u"无法解析给定输入列的 'string1'。 - Angie
1
这是因为udf期望列,但您可以使用lit函数将这些值转换为列。例如:lit(“abcd”).as(“dummy_col”)将创建一个名为dummy_cal的列,并为所有行赋值“abcd”。 - dumitru

2
只需在PySpark中使用所需的包装器udf来“装饰”您的函数即可。
from pyspark.sql.functions import udf


@udf
def foo(...):
    ...

df.withColumn('result', foo(df['address1'], df['address2']))


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接