有没有办法将其作为位于包org.apache.spark.sql.functions._中的标准函数使用?
目前它仅用于与SQL表达式一起使用,但如果您想返回Column,则使用expr。
org.apache.spark.sql.functions._
expr("transform(i, x -> x + 1)"): Column
使用这种方式,有没有办法提供自定义函数来进行转换?
可以使用Scala UDF*:
spark.udf.register("f", (x: Int) => x + 1)
Seq((1, Seq(1, 2, 3))).toDF("id", "xs")
.withColumn("xsinc", expr("transform(xs, x -> f(x))"))
.show
+---+---------+---------+
| id| xs| xsinc|
+---+---------+---------+
| 1|[1, 2, 3]|[2, 3, 4]|
+---+---------+---------+
虽然它似乎与使用Seq
的UDF没有任何实际好处,但它提供了对Python UDFs的部分支持(识别udfs,正确推导类型并进行调用),但截至2.4.0版本,序列化机制似乎已经出现问题(所有记录都作为None
传递给UDF):
from typing import Optional
from pyspark.sql.functions import expr
sc.version
'2.4.0'
def f(x: Optional[int]) -> Optional[int]:
return x + 1 if x is not None else None
spark.udf.register('f', f, "integer")
df = (spark
.createDataFrame([(1, [1, 2, 3])], ("id", "xs"))
.withColumn("xsinc", expr("transform(xs, x -> f(x))")))
df.printSchema()
root
|-- id: long (nullable = true)
|-- xs: array (nullable = true)
| |-- element: long (containsNull = true)
|-- xsinc: array (nullable = true)
| |-- element: integer (containsNull = true)
df.show()
+---+---------+-----+
| id| xs|xsinc|
+---+---------+-----+
| 1|[1, 2, 3]| [,,]|
+---+---------+-----+
当然,在这里没有真正的性能提升潜力 - 它会分派到
BasePythonRunner
,因此开销应与普通的
udf
相同。
相关的 JIRA 票据
SPARK-27052 - 在 transform 中使用 PySpark udf 会产生 NULL 值。