我正在尝试实现自定义UDT,并能够从Spark SQL中引用它(正如Spark SQL白皮书第4.4.2节所述)。
如果我只是执行
真正的例子是拥有一个由Cap'n Proto或类似的离线数据结构支持的自定义UDT。
对于这篇文章,我举了一个牵强附会的例子。我知道我可以使用Scala case类而不需要做任何工作,但那不是我的目标。
例如,我有一个名为Person
的对象,包含几个属性,我想能够执行SELECT person.first_name FROM person
。我遇到了错误Can't extract value from person#1
,不确定原因所在。
这里有完整的源代码(也可在https://github.com/andygrove/spark-sql-udt上获得)
package com.theotherandygrove
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object Example {
def main(arg: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Example")
.setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val schema = StructType(List(
StructField("person_id", DataTypes.IntegerType, true),
StructField("person", new MockPersonUDT, true)))
// load initial RDD
val rdd = sc.parallelize(List(
MockPersonImpl(1),
MockPersonImpl(2)
))
// convert to RDD[Row]
val rowRdd = rdd.map(person => Row(person.getAge, person))
// convert to DataFrame (RDD + Schema)
val dataFrame = sqlContext.createDataFrame(rowRdd, schema)
// register as a table
dataFrame.registerTempTable("person")
// selecting the whole object works fine
val results = sqlContext.sql("SELECT person.first_name FROM person WHERE person.age < 100")
val people = results.collect
people.map(row => {
println(row)
})
}
}
trait MockPerson {
def getFirstName: String
def getLastName: String
def getAge: Integer
def getState: String
}
class MockPersonUDT extends UserDefinedType[MockPerson] {
override def sqlType: DataType = StructType(List(
StructField("firstName", StringType, nullable=false),
StructField("lastName", StringType, nullable=false),
StructField("age", IntegerType, nullable=false),
StructField("state", StringType, nullable=false)
))
override def userClass: Class[MockPerson] = classOf[MockPerson]
override def serialize(obj: Any): Any = obj.asInstanceOf[MockPersonImpl].getAge
override def deserialize(datum: Any): MockPerson = MockPersonImpl(datum.asInstanceOf[Integer])
}
@SQLUserDefinedType(udt = classOf[MockPersonUDT])
@SerialVersionUID(123L)
case class MockPersonImpl(n: Integer) extends MockPerson with Serializable {
def getFirstName = "First" + n
def getLastName = "Last" + n
def getAge = n
def getState = "AK"
}
如果我只是执行
SELECT person FROM person
,那么查询就可以正常工作。尽管在模式中定义了这些属性,但我无法在SQL中引用它们。