如何使用高阶函数transform?

9

这是关于transform高阶函数的问题 (https://issues.apache.org/jira/browse/SPARK-23908)。

有没有办法将其用作标准函数(在包org.apache.spark.sql.functions._中)?

我有一个字符串数组,想对每个字符串应用URI规范化。目前我使用了UDF来完成。我希望在spark 2.4.0中能够跳过UDF。

据我所见,应该像df.selectExpr("transform(i, x -> x + 1)")这样在selectExpr中使用,但它只能与selectExpr一起使用吗?

这样使用时是否有任何方式提供自定义函数进行转换?是否有任何方法实现它,还是要使用老式的UDF?


你能描述一下你的使用情况吗?我认为使用UDF在性能方面是最快的。 - Jacek Laskowski
我有一个字符串数组,想对每个字符串应用URI规范化。目前我是使用UDF来实现的。但我希望在Spark 2.4.0中可以跳过使用UDF。 - MitakaJ9
2个回答

9
有没有办法将其作为位于包org.apache.spark.sql.functions._中的标准函数使用?
目前它仅用于与SQL表达式一起使用,但如果您想返回Column,则使用expr。
org.apache.spark.sql.functions._

expr("transform(i, x -> x + 1)"): Column

使用这种方式,有没有办法提供自定义函数来进行转换?
可以使用Scala UDF*:
spark.udf.register("f", (x: Int) => x + 1)

Seq((1, Seq(1, 2, 3))).toDF("id", "xs")
  .withColumn("xsinc", expr("transform(xs, x -> f(x))"))
  .show

+---+---------+---------+
| id|       xs|    xsinc|
+---+---------+---------+
|  1|[1, 2, 3]|[2, 3, 4]|
+---+---------+---------+

虽然它似乎与使用Seq的UDF没有任何实际好处,但它提供了对Python UDFs的部分支持(识别udfs,正确推导类型并进行调用),但截至2.4.0版本,序列化机制似乎已经出现问题(所有记录都作为None传递给UDF):

from typing import Optional
from pyspark.sql.functions import expr

sc.version

'2.4.0'

def f(x: Optional[int]) -> Optional[int]:
    return x + 1 if x is not None else None

spark.udf.register('f', f, "integer")

df = (spark
    .createDataFrame([(1, [1, 2, 3])], ("id", "xs"))
    .withColumn("xsinc", expr("transform(xs, x -> f(x))")))

df.printSchema()

root
 |-- id: long (nullable = true)
 |-- xs: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- xsinc: array (nullable = true)
 |    |-- element: integer (containsNull = true)

df.show()

+---+---------+-----+
| id|       xs|xsinc|
+---+---------+-----+
|  1|[1, 2, 3]| [,,]|
+---+---------+-----+

当然,在这里没有真正的性能提升潜力 - 它会分派到 BasePythonRunner,因此开销应与普通的 udf 相同。
相关的 JIRA 票据 SPARK-27052 - 在 transform 中使用 PySpark udf 会产生 NULL 值

你说如果想让一个高阶函数返回一列,就要使用 F.expr(),这让我想到在 SQL 中它们可以用于不仅仅是单独的列。这种情况是否可能? - Topde

1

似乎在Python中,正如上面的答案所述,在使用udf时,expr函数存在问题,但可以按照以下方式完成:

from typing import Optional
from pyspark.sql.functions import expr, transform

def f(x: Optional[int]) -> Optional[int]:
    return x + 1 if x is not None else None

spark.udf.register('f', f, "integer")

df = (spark
    .createDataFrame(
        [(1, [1, 2, 3])], ("id", "xs"))
    .withColumn("xsinc", eval("transform(col('xs'),  lambda x: f(x))")))

df.show()

+---+---------+---------+
| id|       xs|    xsinc|
+---+---------+---------+
|  1|[1, 2, 3]|[2, 3, 4]|
+---+---------+---------+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接