将Scala的case class转换为PySpark模式(schema)

3

假设有一个简单的Scala case类如下:

package com.foo.storage.schema   
case class Person(name: String, age: Int)

可以按照以下方式从一个case class创建Spark schema:

import org.apache.spark.sql._
import com.foo.storage.schema.Person  

val schema = Encoders.product[Person].schema

我想知道在Python/PySpark中是否可以从一个case class中访问schema。我希望能够像这样做[Python]:
jvm = sc._jvm
py4j_class = jvm.com.foo.storage.schema.Person 
jvm.org.apache.spark.sql.Encoders.product(py4j_class)

这会抛出一个错误 com.foo.storage.schema.Person._get_object_id 在JVM中不存在。在Scala中,Encoders.product是一个通用的类型,我不确定如何使用Py4J指定类型。有没有一种方法可以使用case类来创建PySpark模式?

1个回答

1
我发现使用泛型也不是很干净/容易的方法来做到这一点,也没有一个纯Scala函数可以实现。最终我做的是为case类创建一个伴生对象,它可以获取模式。 解决方案
package com.foo.storage.schema
case class Person(name: String, age: Int)
object Person {
  def getSchema = Encoders.product[Person].schema
}

此函数可以从Py4J中调用,但将返回一个JavaObject。它可以使用类似以下的辅助函数进行转换:

from pyspark.sql.types import StructType
import json
def java_schema_to_python(j_schema):
  json_schema = json.loads(ddl.json())
  return StructType.fromJson(json_schema)

最终,我们可以提取出我们的模式:
j_schema = jvm.com.foo.storage.Person.getSchema()
java_schema_to_python(j_schema)

另一种解决方案

我发现还有一种方法可以实现这个功能,但我更喜欢第一种方法。您可以编写一个通用函数,在Scala中推断出参数的类型,并使用该类型进行推断:

object SchemaConverter {
  def getSchemaFromType[T <: Product: TypeTag](obj: T): StructType = {
     Encoders.product[T].schema
  }
}

可以这样调用:
val schema = SchemaConverter.getSchemaFromType(Person("Joe", 42))

我不喜欢这种方法,因为它要求你创建一个虚拟实例来表示案例类。虽然我没有测试过,但我认为上面的函数也可以使用Py4J调用。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接