Spark Dataset使用TypedColumn进行select操作

15

查看Spark DataSet上的select()函数,有各种生成的函数签名:

(c1: TypedColumn[MyClass, U1],c2: TypedColumn[MyClass, U2] ....)

这似乎暗示我应该能够直接引用MyClass的成员并实现类型安全,但我不确定如何做到...

ds.select("member") 当然可以运行...看起来ds.select(_.member)也可能以某种方式工作?

2个回答

22
在Scala DSL中,有多种方法可以标识一个Column

  • 使用符号:'name
  • 使用字符串:$"name"col(name)
  • 使用表达式:expr("nvl(name, 'unknown') as renamed")

要从Column获取TypedColumn,只需使用myCol.as[T]

例如:ds.select(col("name").as[String])


1
这个答案是正确的,但要注意这个 as[T] 不是类型安全的,如果假设错误的类型,它可能会在运行时出错。 - linehrr
1
好注意点。为了从编译器获得最大的帮助,你需要完全转换到Scala类型,例如 ds.as[T].map { t: T => ... }。请注意,数据转换成本会存在,因为内部Spark使用原始二进制数据而不是Scala类型。 - Sim

21
如果你想要等同于 ds.select(_.member) 的操作,只需要使用 map
case class MyClass(member: MyMember, foo: A, bar: B)
val ds: DataSet[MyClass] = ???
val members: DataSet[MyMember] = ds.map(_.member)

编辑:不使用map的原因。

更高效的方法是通过投影来完成相同的操作,而根本不需要使用map。虽然您会失去编译时的类型检查,但作为交换,Catalyst查询引擎有机会执行更多优化。正如下面@Sim在评论中所暗示的那样,主要的优化围绕着不需要将MyClass的整个内容从Tungsten内存空间反序列化到JVM堆内存中-只是为了调用访问器-然后将_.member的结果重新序列化回Tungsten。

为了举一个更具体的例子,让我们像这样重新定义我们的数据模型:

  // Make sure these are not nested classes 
  // (i.e. in a top level compilation units).
  case class MyMember(something: Double)
  case class MyClass(member: MyMember, foo: Int, bar: String)

这些需要是case类,这样SQLImplicits.newProductEncoder[T <: Product]就可以为我们提供一个隐式的Encoder[MyClass]Dataset[T] API所需。

现在我们可以让上面的例子更加具体:

  val ds: Dataset[MyClass] = Seq(MyClass(MyMember(1.0), 2, "three")).toDS()
  val membersMapped: Dataset[Double] = ds.map(_.member.something)

为了查看幕后发生了什么,我们使用explain()方法:
membersMapped.explain()

== Physical Plan ==
*(1) SerializeFromObject [input[0, double, false] AS value#19]
+- *(1) MapElements <function1>, obj#18: double
   +- *(1) DeserializeToObject newInstance(class MyClass), obj#17: MyClass
      +- LocalTableScan [member#12, foo#13, bar#14]

这使得与Tungsten的序列化明显地相关。

使用投影[^1]来获取相同的值:

val ds2: Dataset[Double] = ds.select($"member.something".as[Double])
ds2.explain()

== Physical Plan ==
LocalTableScan [something#25]

就这样!只需一步操作[^2]。除了将MyClass编码到原始数据集中,没有其他序列化。

[^1]: 投影被定义为$"member.something"而不是$"value.member.something"的原因与Catalyst自动投影单列DataFrame的成员有关。

[^2]: 公平地说,在第一个物理计划中的步骤旁边的*表示它们将由WholeStageCodegenExec执行,从而使这些步骤成为一个单独的即时编译JVM函数,该函数具有其自己的运行时优化集合。因此,在实践中,您必须进行经验测试以真正评估每种方法的优劣。


请注意,会有数据转换成本,因为内部Spark使用原始二进制数据而不是Scala类型。 - Sim
在这种情况下使用数据集的优点是什么?它只是在性能上进行类型安全的权衡吗?我不太明白何时使用数据集会有用! - Aravind Yarram
1
大多数情况下,您只需要使用Dataframe。有时,为了与其他函数进行互操作,您可能希望进入DataSet空间,以便能够调用“map”、“flatMap”等而不创建UDF。或者其他一些特殊情况。 - metasim

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接