如何将行映射到由protobuf生成的类?

6
我需要编写一个作业,读取DataSet [Row]并将其转换为DataSet [CustomClass],其中CustomClass是protobuf类。
val protoEncoder = Encoders.bean(classOf[CustomClass])
val transformedRows = rows.map {
  case Row(f1: String, f2: Long ) => {
  val pbufClass = CustomClass.newBuilder()
                             .setF1(f1)
                             .setF2(f2)
  pbufClass.build()}}(protoEncoder)

然而,看起来 Protobuf 类并不是真正的 Java Bean,我在以下代码中遇到了 NPE 错误。
val x =  Encoders.bean(classOf[CustomClass])

如何确保作业能够发出一个类型为DataSet [CustomClass]的数据集,其中CustomClass是protobuf类。有关编写该类自定义编码器的任何指针/示例?

NPE:

val encoder2 = Encoders.bean(classOf[CustomClass])
java.lang.NullPointerException
  at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:89)
  at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
  ... 48 elided

Bean编码器在内部使用

JavaTypeInference.serializerFor(protoClass)

如果我尝试在我的自定义编码器中执行相同的操作,我会得到一个更详细的错误信息:
Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant
        at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430)
        at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:337)
        at xxx.yyy..EncoderHolder$.protoEncoder(xxx.scala:69)
        at xxx.yyy..EncoderHolder$.encoder$lzycompute$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.encoder$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.liftedTree1$1(xxx.scala:84)
        at xxx.yyy..EncoderHolder$.<init>(xxx.scala:81)
        at xxx.yyy..EncoderHolder$.<clinit>(xxx.scala)

你能把NPE粘贴到你的问题中吗? - Jacek Laskowski
已添加堆栈跟踪,我相当确定此时发生的原因是Protobuf类不是有效的Java bean。 - Apurva
@JacekLaskowski:更新了两个堆栈跟踪(使用Encoders.bean以及在自定义编码器中使用类似的代码)-这有帮助吗? - Apurva
5个回答

3

我的编码器经验并不是很好,所以我建议你不要再花费更多时间在这上面。

我宁愿考虑其他选择,以及如何按照Spark的方式处理,并将Spark计算结果映射到最后一步生成的protobuf类中。


1
感谢@JacekLaskowski的指导,总体来说非常有帮助。我希望能够使用map操作以分布式方式将它们写入键值存储中。到目前为止,我的尝试看起来与您提到的非常相似,但没有成功。如果我成功了,我会发布更新。 - Apurva

2

如果你想将行(Row)转换为Protobuf类,可以使用sparksql-protobuf库。

这个库提供了在SparkSQL中使用Protobuf对象的工具。它提供了一种将由SparkSQL写入的Parquet文件读回作为兼容protobuf对象的RDD的方法。它还可以将protobuf对象的RDD转换为DataFrame。

请在你的build.sbt文件中添加依赖项。

resolvers += Resolver.jcenterRepo

libraryDependencies ++= Seq(
    "com.github.saurfang" %% "sparksql-protobuf" % "0.1.2",
    "org.apache.parquet" % "parquet-protobuf" % "1.8.1"

)

您可以参考库中的一些示例来开始使用。

示例1

示例2

希望这有所帮助!


谢谢,我看了一下这个,“它提供了一种方法,将由SparkSQL编写的parquet文件作为兼容protobuf对象的RDD读取回来” - 在我的情况下,这个假设不一定成立 - 底层表示不是parquet。 - Apurva
我没有使用过Spark和Protobuf,但这应该会对你有所帮助。 - koiralo
更多的背景信息,我尝试编写自己的编码器。val serializer = JavaTypeInference.serializerFor(protoClass) 这就是预料中的失败:Caused by: java.lang.UnsupportedOperationException: 无法推断出 xxx.yyy.CustomClass 类型,因为它不符合 bean 规范 at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430) - Apurva

0
我是这样做的:我使用了saurfang的sparksql-protobuf库(代码可在Github上获得)。你直接得到一个RDD [ProtoSchema],但很难转换为Dataset [ProtoSchema]。我主要使用它来获取信息并将其附加到另一个RDD中,其中包含用户定义的函数。

1:导入库

使用Maven:

<dependencies>
    <dependency>
        <groupId>com.github.saurfang</groupId>
        <artifactId>sparksql-protobuf_2.10</artifactId>
        <version>0.1.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-protobuf</artifactId>
        <version>1.9.0</version>
    </dependency>

    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>3.5.1</version>
    </dependency>
</dependencies>
...

<repositories>
    <repository>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
        <id>bintray-saurfang-maven</id>
        <name>bintray</name>
        <url>https://dl.bintray.com/saurfang/maven</url>
    </repository>
</repositories>

2:以RDD[ProtoSchema]的形式读取数据

val sess: SparkSession = ...
val proto_rdd = new ProtoParquetRDD[ProtoSchema](sess.sparkContext, input_path, classOf[ProtoSchema])

(可选)添加 PathFilter(Hadoop API)

如果您想添加 PathFilter 类(就像在 Hadoop 中使用的那样),或者激活其他在 Hadoop 中可用的选项,您可以执行以下操作:

sess.sparkContext.hadoopConfiguration.setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true)
sess.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[MyPathFiltering], classOf[PathFilter])

但不要忘记清除你的Hadoop配置,以防你想使用你的SparkSession读取其他内容:

sess.sparkContext.hadoopConfiguration.clear()

0
虽然不是严格的答案,但我找到了一个变通方法。如果我们使用RDDs,则不需要编码器。
val rows =
      spark.sql("select * from tablename").as[CaseClass].rdd
val transformedRows = rows.map {
  case Row(f1: String, f2: Long ) => {
  val pbufClass = CustomClass.newBuilder()
                             .setF1(f1)
                             .setF2(f2)
  pbufClass.build()}}

这将为我提供一个 Protobuf 类的 RDD,以便我可以进行操作。

为什么不直接使用sparksql-protobuf(saurfang的github)构建您想要的RDD [Proto]? - belka

0

默认的序列化对我的protobuf对象也不起作用。

然而,事实证明Spark内部使用的是Kryo。所以如果你这样做

Encoders.kryo(ProtoBuffObject.class)

它运行成功了。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接