如何使用Spark Dataset与Thrift?

4
我的数据格式是使用Apache Thrift定义的,代码由Scrooge生成。我使用类似于blog中所解释的方法将其存储在Spark中使用Parquet。我可以很容易地将该数据读入Dataframe中,只需执行以下操作:
val df = sqlContext.read.parquet("/path/to/data")

我可以通过一些更多的花样在RDD中读取它:

def loadRdd[V <: TBase[_, _]](inputDirectory: String, vClass: Class[V]): RDD[V] = {
    implicit val ctagV: ClassTag[V] = ClassTag(vClass)
    ParquetInputFormat.setReadSupportClass(jobConf, classOf[ThriftReadSupport[V]])
    ParquetThriftInputFormat.setThriftClass(jobConf, vClass)
    val rdd = sc.newAPIHadoopFile(
      inputDirectory, classOf[ParquetThriftInputFormat[V]], classOf[Void], vClass, jobConf)
    rdd.asInstanceOf[NewHadoopRDD[Void, V]].values
  }
loadRdd("/path/to/data", classOf[MyThriftClass])

我的问题是:如何使用Spark 1.6发布的新Dataset API访问数据?我想要使用Dataset API的好处是:具有与DataFrame相同速度的类型安全性。
我知道需要某种编码器,对于基本类型和case类已经提供了这些编码器,但是我拥有的是生成的thrift代码(Java或Scala),它看起来很像一个case类,但实际上并不是。
我尝试了显而易见的选项,但没有成功:
val df = sqlContext.read.parquet("/path/to/data")

df.as[MyJavaThriftClass]

<console>:25: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._  Support for serializing other types will be added in future releases.

df.as[MyScalaThriftClass]

scala.ScalaReflectionException: <none> is not a term
  at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
  at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols.scala:84)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:492)
  at org.apache.spark.sql.catalyst.ScalaReflection$.extractorsFor(ScalaReflection.scala:394)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:54)
  at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)
  ... 48 elided


df.as[MyScalaThriftClass.Immutable]

java.lang.UnsupportedOperationException: No Encoder found for org.apache.thrift.protocol.TField
- field (class: "org.apache.thrift.protocol.TField", name: "field")
- array element class: "com.twitter.scrooge.TFieldBlob"
- field (class: "scala.collection.immutable.Map", name: "_passthroughFields")
- root class: "com.worldsense.scalathrift.ThriftRange.Immutable"
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:597)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:509)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:502)
  at scala.collection.immutable.List.flatMap(List.scala:327)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:502)
  at org.apache.spark.sql.catalyst.ScalaReflection$.toCatalystArray$1(ScalaReflection.scala:419)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:537)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:509)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:502)
  at scala.collection.immutable.List.flatMap(List.scala:327)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:502)
  at org.apache.spark.sql.catalyst.ScalaReflection$.extractorsFor(ScalaReflection.scala:394)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:54)
  at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)
  ... 48 elided

看起来Shapeless与Thrift生成的代码很兼容,我想知道是否可以使用它来生成当前编码器API接受的内容。

有什么提示吗?


你看过frameless了吗? - Miles Sabin
1
嗨@MilesSabin,看起来非常有前途,但是通过查看代码,我无法确定它是否可以完全不使用case class就能工作。实际上,唯一的公共API RichDataSet 已经从一个Dataset开始了。我会在Gitter频道上发消息,看看作者是否有什么好的建议。 - Davi
你解决了这个问题吗? - habitats
1个回答

0

通过传递Encoders.bean(My.getClass)作为显式隐式参数,应该可以解决这个问题。

例如:df.as[MyJavaThriftClass](Encoders.bean(MyJavaThriftClass.getClass))


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接