如何在Pyspark中使用Scala类

33
我一直在寻找是否有方法可以在Pyspark中使用Scala类,并且我没有找到关于这个主题的文档或指南。
假设我创建了一个简单的Scala类,其中使用了apache-spark的一些库,例如:
class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
  def exe(): DataFrame = {
    import sqlContext.implicits._

    df.select(col(column))
  }
}
  • 有没有可能在 Pyspark 中使用这个类?
  • 难度是否太大?
  • 我必须创建一个 .py 文件吗?
  • 有没有指南展示如何做到这一点?

顺便说一句,我也看了一下 spark 的代码,感觉有些迷失,并且我无法为自己的目的复制它们的功能。

2个回答

44

是的,这是可能的,尽管有时并不容易。通常您需要一个Java(友好的)包装器,这样您就不必处理Scala功能,因为这些功能不能使用纯Java轻松表达,并且因此与Py4J网关不兼容。

假设您的类位于包com.example中,并且拥有名为df的Python DataFrame

df = ... # Python DataFrame

你需要:

  1. 使用你喜欢的构建工具构建一个jar包。

  2. 将它包含在驱动程序类路径中,例如在PySpark shell / spark-submit中使用--driver-class-path参数。根据你的代码情况,你可能还需要使用--jars选项来传递它。

  3. 从Python SparkContext实例中提取JVM实例:

  4. jvm = sc._jvm
    
  5. 从一个SQLContext实例中提取Scala SQLContext:

  6. ssqlContext = sqlContext._ssql_ctx
    
  7. df中提取Java DataFrame

  8. jdf = df._jdf
    
  9. 创建SimpleClass类的新实例:

    simpleObject = jvm.com.example.SimpleClass(ssqlContext, jdf, "v")
    
  10. 调用exe方法并使用Python的DataFrame对结果进行封装:

  11. from pyspark.sql import DataFrame
    
    DataFrame(simpleObject.exe(), ssqlContext)
    
    结果应该是一个有效的 PySpark DataFrame。当然,你可以将所有步骤合并成一个调用。 重要提示: 仅当 Python 代码单独在驱动程序上执行时,此方法才可行。它不能在 Python action 或 transformation 中使用。有关详细信息,请参阅如何从操作或转换中使用 Java/Scala 函数?

如果Scala类也有替代构造函数,会发生什么?它应该能正常工作吗? - se7entyse7en

5

关于@zero323的回答的更新,考虑到Spark的API在过去六年中已经发生了变化,适用于Spark-3.2的方法如下:

  1. 将Scala代码编译成JAR文件(例如使用sbt assembly
  2. 将JAR文件包含在--jars参数中,与任何--py-files参数一起提交给spark-submit,以便进行本地包定义
  3. 在Python中提取JVM实例:
jvm = spark._jvm
  1. 提取SparkSession的Java表示:
jSess = spark._jsparkSession
  1. 提取Java句柄,用于将PySpark DataFrame“df”传递到Scala方法中:
jdf = df._jdf
  1. 在 PySpark 中创建一个新的 SimpleClass 实例:
simpleObject = jvm.com.example.SimpleClass(jSess, jdf, "v")

调用`exe`方法并将其输出转换为PySpark的`DataFrame`:
from pyspark.sql import DataFrame

result = DataFrame(simpleObject.exe(), spark)

如果您需要传递额外的参数,例如 Python 字典,PySpark 可能会自动将它们转换为相应的 Java 类型,然后出现在 Scala 方法中。Scala 提供了 JavaConverters 包来帮助将其转换为更自然的 Scala 数据类型。例如,可以将 Python 字典传递到 Scala 方法中,并立即从 Java HashMap 转换为 Scala(可变)Map:
def processDict(spark: SparkSession, jparams: java.util.Map[String, Any]) {
  import scala.collection.JavaConverters._
  val params = jparams.asScala
  ...
}

谢谢分享这个。但是在传递参数时如何进行类型转换呢? - Gary Liu
谢谢分享这个。但是当传递参数时如何进行类型转换呢?例如,我有一个Scala类,其中有一个方法:read(paraA: String, paraB: Map[String, String])当我像这样调用该方法:read("abc", {"cde":"fgh"})它给了我以下错误:Method read([class java.lang.String, class java.util.HashMap]) does not exist.我猜这是因为Python字典被转换为HashMap,但原始方法需要一个Scala不可变Map? - Gary Liu
1
谢谢,Gary - 我已经添加了一个示例,展示你可能如何完成这个任务。 - rwp
感谢@rwp提供更新的Spark API答案。请问在第5步中,df是在scala还是python中定义的?我很难跟随您的步骤,尝试理解哪些步骤是在哪个"运行时堆栈"中完成的。谢谢! - geekyj
1
谢谢,@geekyj - 我已经修改了描述,试图让它更清晰,即“df”是您想要传递到Scala库中的任何数据帧,并且此步骤在PySpark中执行而不是Scala Spark中执行。 - rwp

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接