如何在Spark SQL中为自定义类型定义模式?

27
以下示例代码尝试将一些 case 对象放入数据框中。 该代码包括一个 case 对象层次结构的定义以及使用此 trait 的 case class:
```html

以下示例代码尝试将一些 case 对象放入数据框中。该代码包括一个 case 对象层次结构的定义,以及使用该 trait 的 case 类:

```
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

sealed trait Some
case object AType extends Some
case object BType extends Some

case class Data( name : String, t: Some)

object Example {
  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf()
      .setAppName( "Example" )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
    df.show()
  }
}    

执行代码时,不幸遇到以下异常:

java.lang.UnsupportedOperationException: Schema for type Some is not supported

问题

  • 是否有可能添加或定义某些类型(这里是类型 Some )的模式?
  • 是否存在另一种方法来表示此类枚举?
    • 我尝试直接使用 Enumeration ,但也没有成功。(见下文)

Enumeration的代码:

object Some extends Enumeration {
  type Some = Value
  val AType, BType = Value
}

提前致谢。我希望最好的方法不是使用字符串。

1个回答

24

Spark 2.0.0+:

The UserDefinedType has been made private in Spark 2.0.0, and there is currently no Dataset-friendly replacement available. For more information, refer to SPARK-14155 (Hide UserDefinedType in Spark 2.0). However, in most cases, a statically typed Dataset can serve as a replacement.

A Jira ticket (SPARK-7768) has been created to make the UDT API public again with a target version of 2.4. Refer to How to store custom objects in Dataset? for additional information.

Spark < 2.0.0:

Is it possible to add or define a schema for certain types (in this case, "Some")?

The answer may depend on how much you need this functionality. It is technically possible to create a UserDefinedType, but it requires access to DeveloperApi, and the process is not straightforward or well-documented.

import org.apache.spark.sql.types._

@SQLUserDefinedType(udt = classOf[SomeUDT])
sealed trait Some
case object AType extends Some
case object BType extends Some

class SomeUDT extends UserDefinedType[Some] {
  override def sqlType: DataType = IntegerType

  override def serialize(obj: Any) = {
    obj match {
      case AType => 0
      case BType => 1
    }
  }

  override def deserialize(datum: Any): Some = {
    datum match {
      case 0 => AType
      case 1 => BType
    }
  }

  override def userClass: Class[Some] = classOf[Some]
}

你可能需要覆盖 hashCodeequals 方法。

其 PySpark 对应代码可以如下所示:

from enum import Enum, unique
from pyspark.sql.types import UserDefinedType, IntegerType

class SomeUDT(UserDefinedType):
    @classmethod
    def sqlType(self):
        return IntegerType()

    @classmethod
    def module(cls):
        return cls.__module__

    @classmethod 
    def scalaUDT(cls): # Required in Spark < 1.5
        return 'net.zero323.enum.SomeUDT'

    def serialize(self, obj):
        return obj.value

    def deserialize(self, datum):
        return {x.value: x for x in Some}[datum]

@unique
class Some(Enum):
    __UDT__ = SomeUDT()
    AType = 0
    BType = 1

在 Spark < 1.5 版本中,Python UDT 需要使用配对的 Scala UDT,但在 1.5 版本中似乎不再需要。

对于简单的 UDT,您可以使用简单类型(例如 IntegerType 而非整个 Struct)。


感谢@zero323!您是否参与了UDT 2.4的努力?我在https://issues.apache.org/jira/browse/SPARK-7768中发布了一堆帖子,试图弄清这些事情。 - Choppy The Lumberjack

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接