更新
这个答案仍然有效且信息丰富,尽管自2.2/2.3以来情况有所改善,因为现在内置了编码器支持 Set
、Seq
、Map
、Date
、Timestamp
和 BigDecimal
。如果你只使用 case classes 和常规的 Scala 类型,那么在 SQLImplicits
中的隐式编码器就足够了。
不幸的是,几乎没有任何添加来帮助解决这个问题。在 Encoders.scala
或 SQLImplicits.scala
中搜索 @since 2.0.0
,会发现大多数与原始类型(和一些 case classes 的调整)有关。因此,首先要说的是:当前没有真正好的支持自定义类编码器。有了这个前提,接下来的内容将介绍一些技巧,尽可能地利用我们目前拥有的资源,但是要注意:这并不完美,我会尽力让所有限制清晰明确。
问题到底是什么
当你想要创建一个数据集时,Spark "需要编码器(将类型为 T 的 JVM 对象转换为内部 Spark SQL 表示形式,并从 SparkSession
通过隐式自动创建,或通过调用 Encoders
上的静态方法显式创建)"(摘自createDataset 的文档)。编码器将采用 Encoder[T]
的形式,其中 T
是你正在编码的类型。第一个建议是添加 import spark.implicits._
(它提供了这些隐式编码器),第二个建议是使用这个编码器相关函数集显式传递隐式编码器。
普通类没有可用的编码器,因此
import spark.implicits._
class MyObj(val i: Int)
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
如果您在Dataset中存储其他类型数据,将会出现以下隐式相关的编译时错误:
无法为存储在Dataset中的类型找到编码器。导入sqlContext.implicits._ 支持原始类型(Int、String等)和产品类型(case classes)。将在未来的版本中添加对序列化其他类型的支持。
但是,如果您将导致以上错误的任何类型用某个扩展Product类的类进行封装,这个错误会令人困惑地推迟到运行时:
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
这段代码可以正常编译,但在运行时会出现如下错误:
java.lang.UnsupportedOperationException: 找不到 MyObj 的编码器
原因是 Spark 使用隐式转换在运行时才创建编码器(通过 Scala 反射)。在这种情况下,Spark 在编译时仅检查最外层的类是否扩展了 Product
(所有 case 类都扩展了),并且直到运行时才意识到它仍然不知道如何处理 MyObj
(如果我尝试创建一个 Dataset[(Int,MyObj)]
,Spark 会在运行时等待 MyObj
报错)。这些是急需解决的核心问题:
- 一些扩展了
Product
的类尽管总是在运行时崩溃,但仍能编译通过。
- 没有办法传递自定义编码器以支持嵌套类型(我无法为
MyObj
提供编码器以使 Spark 知道如何对 Wrap[MyObj]
或 (Int,MyObj)
进行编码)。
只需使用 kryo
每个人都建议使用kryo
编码器解决问题。
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
但是,这很快就变得很繁琐了。特别是如果您的代码正在操作各种数据集、连接、分组等,那么您最终会积累一堆额外的隐式参数。所以,为什么不只是自动创建一个隐式参数,让它自动完成所有这些操作呢?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
现在,似乎我可以做任何我想做的事情(下面的示例在spark-shell
中不起作用,在那里spark.implicits._
会自动导入)
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2")
val d3 = d1.map(d => (d.i, d)).alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
或者准确来说,问题在于使用kryo
会导致Spark将数据集中的每一行都存储为一个扁平的二进制对象。对于map
、filter
和foreach
等操作来说,这已经足够了,但是对于join
等操作,Spark需要将其分成多列。检查d2
或d3
的模式,可以看到只有一个二进制列:
d2.printSchema
// root
// |
元组的部分解决方案
因此,在Scala中利用隐式魔法(更多内容请参见6.26.3 过载解析),我可以创建一系列隐式对象,这些对象可以尽可能地完成工作,至少对于元组而言,并且可以很好地与现有的隐式对象协同工作:
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
然后,利用这些隐式参数,我可以使我的上面的示例正常工作,尽管需要重命名一些列。
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
我还没有弄清楚如何在默认情况下获得预期的元组名称(_1
,_2
,...)而无需重命名它们 - 如果有人想尝试一下,请 点击这里 查看名称"value"
的引用和此处是通常添加元组名称的地方。但是,关键点是我现在有一个漂亮的结构化模式:
d4.printSchema
因此,总的来说,这个解决方法:
- 允许我们为元组获取单独的列(因此我们可以再次在元组上进行连接)
- 我们可以再次依靠隐式转换(因此不需要在所有地方都传递
kryo
)
- 与
import spark.implicits._
几乎完全向后兼容(需要进行一些重命名)
- 不能让我们在
kyro
序列化二进制列上进行连接,更别说连接那些可能有的字段了
- 有一个不愉快的副作用,即将某些元组列重命名为"value"(如果需要,可以通过转换
.toDF
,指定新列名称并转换回数据集来撤消此操作,模式名称似乎通过连接保留,这是最需要的地方)。
类的部分解决方案
这个解决方法不太好,也没有好的解决方案。但是,现在我们已经有了上面的元组解决方案,我有一种直觉,另一个答案中的隐式转换解决方案也会稍微不那么痛苦,因为您可以将更复杂的类转换为元组。然后,在创建数据集之后,您可能会使用数据框架方法重命名列。如果一切顺利,这真的是一个改进,因为现在我可以对类的字段执行连接。如果我只使用了一个扁平的二进制kryo
序列化器,那是不可能的。
这里有一个做了一些事情的例子:我有一个类MyObj
,它具有类型为Int
,java.util.UUID
和Set[String]
的字段。第一个可以自己处理。虽然我可以使用kryo
进行序列化,但第二个如果作为String
存储会更有用(因为UUID
通常是我想要连接的内容)。第三个真的只属于二进制列。
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
type MyObjEncoded = (Int, String, Set[String])
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
现在,我可以使用这个工具创建一个带有良好结构的数据集:
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
这个模式向我展示了正确命名的I列,而且对于前两个,我可以使用它们进行连接。
d.printSchema
// root
// |
// |
// |