如何在数据集中存储自定义对象?

167
根据介绍Spark数据集

随着我们期待Spark 2.0的到来,我们计划对数据集(Datasets)进行一些令人兴奋的改进,具体包括:... 自定义编码器 - 尽管我们目前为各种类型自动生成编码器,但我们想开放一个API以支持自定义对象。

试图将自定义类型存储在Dataset中会导致以下错误:

找不到存储在数据集中的类型的编码器。通过导入sqlContext.implicits._可以支持原始类型(Int,String等)和产品类型(case类)。将在未来发布版本中添加其他类型的序列化支持。

或者:

Java.lang.UnsupportedOperationException: 未找到编码器....

是否有任何现有的解决方法?


请注意,此问题仅作为社区Wiki答案的入口点。欢迎更新/改进问题和答案。


看起来他们在3.2中添加了支持 https://issues.apache.org/jira/browse/SPARK-23862 - Geoff Langenderfer
9个回答

263

更新

这个答案仍然有效且信息丰富,尽管自2.2/2.3以来情况有所改善,因为现在内置了编码器支持 SetSeqMapDateTimestampBigDecimal。如果你只使用 case classes 和常规的 Scala 类型,那么在 SQLImplicits 中的隐式编码器就足够了。


不幸的是,几乎没有任何添加来帮助解决这个问题。在 Encoders.scalaSQLImplicits.scala 中搜索 @since 2.0.0,会发现大多数与原始类型(和一些 case classes 的调整)有关。因此,首先要说的是:当前没有真正好的支持自定义类编码器。有了这个前提,接下来的内容将介绍一些技巧,尽可能地利用我们目前拥有的资源,但是要注意:这并不完美,我会尽力让所有限制清晰明确。

问题到底是什么

当你想要创建一个数据集时,Spark "需要编码器(将类型为 T 的 JVM 对象转换为内部 Spark SQL 表示形式,并从 SparkSession 通过隐式自动创建,或通过调用 Encoders 上的静态方法显式创建)"(摘自createDataset 的文档)。编码器将采用 Encoder[T] 的形式,其中 T 是你正在编码的类型。第一个建议是添加 import spark.implicits._(它提供了这些隐式编码器),第二个建议是使用这个编码器相关函数集显式传递隐式编码器。

普通类没有可用的编码器,因此

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

如果您在Dataset中存储其他类型数据,将会出现以下隐式相关的编译时错误:

无法为存储在Dataset中的类型找到编码器。导入sqlContext.implicits._ 支持原始类型(Int、String等)和产品类型(case classes)。将在未来的版本中添加对序列化其他类型的支持。

但是,如果您将导致以上错误的任何类型用某个扩展Product类的类进行封装,这个错误会令人困惑地推迟到运行时:

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

这段代码可以正常编译,但在运行时会出现如下错误:

java.lang.UnsupportedOperationException: 找不到 MyObj 的编码器

原因是 Spark 使用隐式转换在运行时才创建编码器(通过 Scala 反射)。在这种情况下,Spark 在编译时仅检查最外层的类是否扩展了 Product(所有 case 类都扩展了),并且直到运行时才意识到它仍然不知道如何处理 MyObj(如果我尝试创建一个 Dataset[(Int,MyObj)],Spark 会在运行时等待 MyObj 报错)。这些是急需解决的核心问题:

  • 一些扩展了 Product 的类尽管总是在运行时崩溃,但仍能编译通过。
  • 没有办法传递自定义编码器以支持嵌套类型(我无法为 MyObj 提供编码器以使 Spark 知道如何对 Wrap[MyObj](Int,MyObj) 进行编码)。

只需使用 kryo

每个人都建议使用kryo编码器解决问题。

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

但是,这很快就变得很繁琐了。特别是如果您的代码正在操作各种数据集、连接、分组等,那么您最终会积累一堆额外的隐式参数。所以,为什么不只是自动创建一个隐式参数,让它自动完成所有这些操作呢?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

现在,似乎我可以做任何我想做的事情(下面的示例在spark-shell中不起作用,在那里spark.implicits._会自动导入)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

或者准确来说,问题在于使用kryo会导致Spark将数据集中的每一行都存储为一个扁平的二进制对象。对于mapfilterforeach等操作来说,这已经足够了,但是对于join等操作,Spark需要将其分成多列。检查d2d3的模式,可以看到只有一个二进制列:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

元组的部分解决方案

因此,在Scala中利用隐式魔法(更多内容请参见6.26.3 过载解析),我可以创建一系列隐式对象,这些对象可以尽可能地完成工作,至少对于元组而言,并且可以很好地与现有的隐式对象协同工作:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

然后,利用这些隐式参数,我可以使我的上面的示例正常工作,尽管需要重命名一些列。
class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

我还没有弄清楚如何在默认情况下获得预期的元组名称(_1_2,...)而无需重命名它们 - 如果有人想尝试一下,请 点击这里 查看名称"value"的引用和此处是通常添加元组名称的地方。但是,关键点是我现在有一个漂亮的结构化模式:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

因此,总的来说,这个解决方法:
  • 允许我们为元组获取单独的列(因此我们可以再次在元组上进行连接)
  • 我们可以再次依靠隐式转换(因此不需要在所有地方都传递kryo
  • import spark.implicits._几乎完全向后兼容(需要进行一些重命名)
  • 不能让我们在kyro序列化二进制列上进行连接,更别说连接那些可能有的字段了
  • 有一个不愉快的副作用,即将某些元组列重命名为"value"(如果需要,可以通过转换.toDF,指定新列名称并转换回数据集来撤消此操作,模式名称似乎通过连接保留,这是最需要的地方)。

类的部分解决方案

这个解决方法不太好,也没有好的解决方案。但是,现在我们已经有了上面的元组解决方案,我有一种直觉,另一个答案中的隐式转换解决方案也会稍微不那么痛苦,因为您可以将更复杂的类转换为元组。然后,在创建数据集之后,您可能会使用数据框架方法重命名列。如果一切顺利,这真的是一个改进,因为现在我可以对类的字段执行连接。如果我只使用了一个扁平的二进制kryo序列化器,那是不可能的。

这里有一个做了一些事情的例子:我有一个类MyObj,它具有类型为Intjava.util.UUIDSet[String]的字段。第一个可以自己处理。虽然我可以使用kryo进行序列化,但第二个如果作为String存储会更有用(因为UUID通常是我想要连接的内容)。第三个真的只属于二进制列。

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

现在,我可以使用这个工具创建一个带有良好结构的数据集:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

这个模式向我展示了正确命名的I列,而且对于前两个,我可以使用它们进行连接。

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)

1
@AlexeyS 我不这么认为。但是你为什么想要那样做呢?为什么不能使用我提出的最后一个解决方案?如果您可以将数据放入JSON中,那么应该能够提取字段并将它们放入一个case类中... - Alec
1
很不幸,这个答案的底线是没有可行的解决方案。 - baol
1
我的理解是,从性能的角度来看,数据集和数据框架(但不包括RDD,因为它们不需要编码器!)是等效的。不要低估数据集的类型安全性!尽管Spark在内部使用了大量的反射、强制转换等,但这并不意味着您不应该关心所公开的接口的类型安全性。但这让我更有信心创建基于自己的数据集的类型安全函数,这些函数在底层使用数据框架。 - Alec
1
我有一个完美运作的解决方案。它包括以下步骤:
  1. 定义一个自定义类的sparkSql UDT
  2. 注册它
  3. 将你的类封装在一个Product中(例如Tuple1)
- tmnd91
1
你有考虑过使用UDTRegistration吗? - Choppy The Lumberjack
显示剩余10条评论

35
  1. Using generic encoders.

    There are two generic encoders available for now kryo and javaSerialization where the latter one is explicitly described as:

    extremely inefficient and should only be used as the last resort.

    Assuming following class

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }
    

    you can use these encoders by adding implicit encoder:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }
    

    which can be used together as follows:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }
    

    It stores objects as binary column so when converted to DataFrame you get following schema:

    root
     |-- value: binary (nullable = true)
    

    It is also possible to encode tuples using kryo encoder for specific field:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    

    Please note that we don't depend on implicit encoders here but pass encoder explicitly so this most likely won't work with toDS method.

  2. Using implicit conversions:

    Provide implicit conversions between representation which can be encoded and custom class, for example:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }
    

相关问题:


解决方案1似乎不适用于已输入的集合(至少对于Set) 我得到了Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Set[Bar] - Victor P.
@VictorP。恐怕在这种情况下,您需要为特定类型(kryo [Set [Bar]])编写编码器。同样,如果类包含字段Bar,则需要为整个对象编写编码器。这些都是非常粗糙的方法。 - zero323
@zero323 我也遇到了同样的问题。你能否提供一个编码整个项目的代码示例?非常感谢! - Rock
@Rock 我不确定你所说的“整个项目”是什么意思。 - zero323
根据@zero323的评论,"如果类包含字段Bar,则需要为整个对象编写编码器"。我的问题是如何对这个"整个项目"进行编码? - Rock
@Rock 噢,你是指“整个对象”,对吗?据我所知,您需要一个编码器 Encoder.kryo[SomeClassThatContainsBar]。当然,可能还有其他解决方案。例如,您可以使用带有 Bar 字段的元组编码器(请参见上面的 longBarEncoder 示例)。 - zero323

14

您可以使用UDTRegistration,然后Case Classes、元组等等……所有这些都能够正确地与您的自定义类型一起使用!

假设您想要使用自定义枚举:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}

像这样注册:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}

// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)

那就使用它吧!

case class UsingCustomEnum(id:Int, en:CustomEnum)

val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())

假设你想使用多态记录:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

...然后使用它的方式如下:

case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

您可以编写一个自定义 UDT,将所有内容编码为字节(这里我使用的是 Java 序列化,但最好还是使用 Spark 的 Kryo 上下文进行操作)。

首先定义 UDT 类:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

然后注册它:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

那么你就可以使用它了!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

1
我看不到你的 Kryo 在 CustomPolyUDT 中的使用位置。 - mathieu
1
我正在尝试在我的项目中定义一个UDT,但是我遇到了这个错误:“符号UserDefinedType无法从此处访问”。有什么帮助吗? - Rijo Joseph
1
嗨,@RijoJoseph。您需要在项目中创建一个名为org.apache.spark的包,并将您的UDT代码放入其中。 - Choppy The Lumberjack
我尝试了将我的代码放在org.apache.spark的一个包中,并调用注册方法。但仍然出现关于我的枚举类型没有编码器的错误...? - DCameronMauch

5

编码器在 Spark2.0 中的工作方式基本相同。而 Kryo 仍然是推荐的序列化选择。

您可以使用 spark-shell 查看以下示例:

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class NormalPerson(name: String, age: Int) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class NormalPerson

scala> case class ReversePerson(name: Int, age: String) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class ReversePerson

scala> val normalPersons = Seq(
 |   NormalPerson("Superman", 25),
 |   NormalPerson("Spiderman", 17),
 |   NormalPerson("Ironman", 29)
 | )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))

scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+

scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+

scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.

到目前为止,现有范围内没有合适的编码器,因此我们的人员未被编码为二进制值。但是,一旦我们使用Kryo序列化提供一些隐式编码器,情况将会改变。请保留HTML标记。
// Provide Encoders

scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]

scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]

// Ecoders will be used since they are now present in Scope

scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]

scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]

// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

// Our instances still work as expected    

scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.

scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.

2
在使用编码器进行.show后,如何将值转换回普通的非二进制值? - jack

4
在Java Bean类的情况下,这可能会很有用。
import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])

现在你可以将数据帧简单地读取为自定义数据帧。
dataFrame.as[MyClass]

这将创建一个自定义的类编码器而不是二进制编码器。


2
我的示例将使用Java语言,但我认为适应Scala语言也不难。
我已经成功地将`RDD`转换为`Dataset`,只要`Fruit`是一个简单的Java Bean,并且使用spark.createDatasetEncoders.bean方法。
步骤1:创建一个简单的Java Bean。
public class Fruit implements Serializable {
    private String name  = "default-fruit";
    private String color = "default-color";

    // AllArgsConstructor
    public Fruit(String name, String color) {
        this.name  = name;
        this.color = color;
    }

    // NoArgsConstructor
    public Fruit() {
        this("default-fruit", "default-color");
    }

    // ...create getters and setters for above fields
    // you figure it out
}

我建议在DataBricks加强其Encoders之前,您应该坚持使用基本类型和字符串作为字段的类。如果您有一个嵌套对象的类,则创建另一个简单的Java Bean,将其所有字段展平,以便您可以使用RDD转换将复杂类型映射到更简单的类型。虽然这需要一点额外的工作,但我想它会在使用平面模式时对性能有很大帮助。
第二步:从RDD中获取数据集。
SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();

List<Fruit> fruitList = ImmutableList.of(
    new Fruit("apple", "red"),
    new Fruit("orange", "orange"),
    new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);


RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);

"而且,完成了!起泡、冲洗,重复操作。"

我建议指出,对于简单的结构,最好将它们存储在本地Spark类型中,而不是将它们序列化为blob。它们在Python网关上的工作效果更好,在Parquet中更透明,甚至可以转换为相同形状的结构。 - metasim

1

对于那些和我处境相同的人,我也在这里提供我的答案。

具体来说,

  1. 我正在从SQLContext中读取'Set typed data'。所以原始数据格式是DataFrame。

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

  2. 然后使用rdd.map()将其转换为可变.WrappedArray类型的RDD。

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    结果:

    (1,Set(1))


0
除了之前提到的建议,我最近发现的另一个选项是,您可以声明您的自定义类,包括traitorg.apache.spark.sql.catalyst.DefinedByConstructorParams
如果这个类有一个构造函数,它使用ExpressionEncoder可以理解的类型,即原始值和标准集合,那么就可以运行。当您无法将类声明为case class但不想在每次将其包含在数据集中时使用Kryo进行编码时,它会很方便。
例如,我想声明一个包含Breeze向量的case class。通常,唯一能够处理它的encoder是Kryo。但是,如果我声明一个子类,它扩展了Breeze DenseVector和DefinedByConstructorParams,那么ExpressionEncoder就明白它可以被序列化为Double数组。
这是我声明它的方式:
class SerializableDenseVector(values: Array[Double]) extends breeze.linalg.DenseVector[Double](values) with DefinedByConstructorParams
implicit def BreezeVectorToSerializable(bv: breeze.linalg.DenseVector[Double]): SerializableDenseVector = bv.asInstanceOf[SerializableDenseVector]

现在我可以使用SerializableDenseVector在数据集中(直接或作为产品的一部分),只需使用简单的ExpressionEncoder而不需要Kryo。它的工作方式就像Breeze DenseVector,但序列化为Array[Double]。

0

@Alec的回答非常好!只是在他/她的回答中,我想添加一条评论:

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

@Alec提到:

无法传递自定义编码器以处理嵌套类型(我无法向Spark提供仅用于MyObj的编码器,使其知道如何对Wrap [MyObj]或(Int,MyObj)进行编码)。

看起来是这样,因为如果我添加一个MyObj的编码器:

implicit val myEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]

它仍然失败:

java.lang.UnsupportedOperationException: No Encoder found for MyObj
- field (class: "MyObj", name: "unwrap")
- root class: "Wrap"
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:643)

但请注意重要的错误信息:

根类:"Wrap"

它实际上给了一个提示,即编码 MyObj 是不够的,你还必须编码包括 Wrap[T] 在内的整个链
所以如果我这样做,它解决了问题
implicit val myWrapperEncoder = org.apache.spark.sql.Encoders.kryo[Wrap[MyObj]]

因此,@Alec的评论并不完全正确:

我无法为Spark提供仅适用于MyObj的编码器,以便它知道如何对Wrap [MyObj]或(Int,MyObj)进行编码

我们仍然有一种方法来为Spark提供MyObj的编码器,以便它知道如何对Wrap [MyObj]或(Int,MyObj)进行编码。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接