使用Scala类作为pyspark的UDF

4

我正在尝试在使用Apache Spark时将一些计算从Python转移到Scala。我想使用Java的类接口来使用持久变量,例如(这是基于我更复杂用例的非意义MWE):

package mwe

import org.apache.spark.sql.api.java.UDF1

class SomeFun extends UDF1[Int, Int] {
  private var prop: Int = 0

  override def call(input: Int): Int = {
    if (prop == 0) {
      prop = input
    }
    prop + input
  }
}

现在我正在尝试在pyspark中使用这个类:

import pyspark
from pyspark.sql import SQLContext
from pyspark import SparkContext

conf = pyspark.SparkConf()
conf.set("spark.jars", "mwe.jar")
sc = SparkContext.getOrCreate(conf)

sqlContext = SQLContext.getOrCreate(sc)
sqlContext.registerJavaFunction("fun", "mwe.SomeFun")

df0 = sc.parallelize((i,) for i in range(6)).toDF(["num"])
df1 = df0.selectExpr("fun(num) + 3 as new_num")
df1.show()

并且收到以下异常:

pyspark.sql.utils.AnalysisException: u"cannot resolve '(UDF:fun(num) + 3)' due to data type mismatch: differing types in '(UDF:fun(num) + 3)' (struct<> and int).; line 1 pos 0;\n'Project [(UDF:fun(num#0L) + 3) AS new_num#2]\n+- AnalysisBarrier\n      +- LogicalRDD [num#0L], false\n"

应该如何正确实现这个功能?我需要借助Java本身的类吗?如果有提示,我将不胜感激!

1个回答

3
异常的来源是使用不兼容的类型:
  • First of all o.a.s.sql.api.java.UDF* objects require external Java (not Scala types), so UDF expecting integers should take boxed Integer (java.lang.Integer) not Int.

    class SomeFun extends UDF1[Integer, Integer] {
      ...
      override def call(input: Integer): Integer = {
        ...
    
  • Unless you use legacy Python num column uses of LongType not IntegerType:

    df0.printSchema()
    root
     |-- num: long (nullable = true)
    

    So the actual signature should be

    class SomeFun extends UDF1[java.lang.Long, java.lang.Long] {
      ...
      override def call(input: java.lang.Long): java.lang.Long = {
        ...
    

    or data should be casted before applying UDF

    df0.selectExpr("fun(cast(num as integer)) + 3 as new_num")
    

最后,在UDFs中不允许使用可变状态。它不会引起异常,但整体行为将是不确定的。


谢谢!我使用了MWE进行了复制,现在必须调整我的其他代码,并查看是否有避免可变状态的方法(确定性并不关键)。 - matz-e

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接