使用函数的pySpark withColumn

3

我有一个数据框,其中有2列:account_idemail_address,现在我想添加另外一列updated_email_address,我会调用某个函数对email_address进行处理来获取updated_email_address。这是我的代码:

def update_email(email):
  print("== email to be updated: " + email)
  today = datetime.date.today()
  updated = substring(email, -8, 8) + str(today.strftime('%m')) + str(today.strftime('%d')) + "_updated"
  return updated

df.withColumn('updated_email_address', update_email(df.email_address))

但结果显示updated_email_address列为空:

+---------------+--------------+---------------------+
|account_id     |email_address |updated_email_address|
+---------------+--------------+---------------------+
|123456gd7tuhha |abc@test.com  |null           |
|djasevneuagsj1 |cde@test.com  |null           |
+---------------+--------------+---------------+

在函数 updated_email 内部,它打印出了以下内容:
Column<b'(email_address + == email to be udpated: )'>

同时还显示了数据框中列的数据类型:

dfData:pyspark.sql.dataframe.DataFrame
account_id:string
email_address:string
updated_email_address:double

为什么updated_email_address列类型是double?

1
你需要使用一个UDF(用户自定义函数)。 https://docs.databricks.com/spark/latest/spark-sql/udf-python.html - IWHKYB
3个回答

9

您正在使用 Column 类型调用 Python 函数。您需要从 update_email 创建 udf,然后使用它:

update_email_udf = udf(update_email)

然而,我建议您不要使用UDF来进行这样的转换,您可以仅使用Spark内置函数来完成(因为UDF性能较差):

df.withColumn('updated_email_address',
              concat(substring(col("email_address"), -8, 8), date_format(current_date(), "ddMM"), lit("_updated"))
             ).show()

您可以在此处找到所有Spark SQL内置函数的文档链接


4

多亏了您,我得重新学习我的Spark班上遗忘的内容。

您不能直接使用WithColumn调用自定义函数,需要使用用户定义函数(UDF)。

以下是一个快速示例,演示如何将自定义函数与您的数据框架配合使用(StringType是函数的返回类型)。

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def update_email(email):
  return email+"aaaa"
#df.dtypes

my_udf = udf(lambda x: update_email(x), StringType())

df.withColumn('updated_email_address', my_udf(df.email_address) ).show()

-1

您可以使用pyspark库调用直接的Python函数来实现输出。以下是代码片段:

def update_email(email): print("== 要更新的电子邮件:" + email) return F.concat(F.substring(F.col(email), -8, 8),F.date_format(F.current_timestamp(),"yyyy MM dd"), F.lit("_updated"))

df=spark.read.format('csv').option('delimiter','|').option('header','true').load('sample4.csv') df=df.withColumn("updatedemail",update_email("email")).show(truncate=False)

== 需要更新的电子邮件:电子邮件 +--------------+-------------+--------------------------+ |账户ID |电子邮件 |更新后的电子邮件 | +--------------+-------------+--------------------------+ |123456gd7tuhha|abc@gmail.com|mail.com2022 05 14_updated| |djasevneuagsj1|def@gmail.com|mail.com2022 05 14_updated|dsjf +--------------+-------------+--------------------------+


1
你的回答可以通过提供更多支持信息来改进。请编辑以添加进一步的细节,例如引用或文档,以便他人可以确认你的答案是正确的。您可以在帮助中心中找到有关如何编写良好答案的更多信息。 - Community

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接