使用Spark的Kryo序列化器与具有字符串数组的Java协议缓冲区存在错误

3

我在使用Java协议缓冲类作为Spark作业中RDDs的对象模型时遇到了一个错误。

对于我的应用程序,我的.proto文件具有重复字符串的属性。例如:

message OntologyHumanName 
{ 
repeated string family = 1;
}

因此,2.5.0版本的protoc编译器会生成类似以下的Java代码:

private com.google.protobuf.LazyStringList family_ = com.google.protobuf.LazyStringArrayList.EMPTY;

如果我运行一个使用Kryo序列化器的Scala Spark作业,我会遇到以下错误

Caused by: java.lang.NullPointerException
at com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:61)
at java.util.AbstractList.add(AbstractList.java:108)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
... 40 more

使用spark.serializer=org.apache.spark.serializer.JavaSerializer时,相同的代码可以正常工作。

我的环境是CDH QuickStart 5.5和JDK 1.8.0_60。


我面临着使用kryo时类似的问题。你解决了吗?如果是这样,能否请您发布解决方案? - Pooja Mazumdar
2个回答

0
尝试使用以下代码注册Lazy类:
Kryo kryo = new Kryo()

kryo.register(com.google.protobuf.LazyStringArrayList.class)

对于自定义的 Protobuf 消息,可以参考此答案中的解决方案,用于注册由protoc生成的自定义/嵌套类。


-1

我认为你的RDD类型包含OntologyHumanName类,例如:RDD[(String, OntologyHumanName)],而这种类型的RDD在shuffle阶段恰好会出现。请参考:https://github.com/EsotericSoftware/kryo#kryoserializable kryo无法对抽象类进行序列化。

  1. 阅读Spark文档:http://spark.apache.org/docs/latest/tuning.html#data-serialization

    val conf = new SparkConf().setMaster(...).setAppName(...)
    conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
    val sc = new SparkContext(conf)
    
  2. 关于Kryo文档:

    public class SomeClass implements KryoSerializable {
       // ...
    
       public void write (Kryo kryo, Output output) {
          // ...
       }
    
       public void read (Kryo kryo, Input input) {
          // ...
       }
    }
    

但是类:OntologyHumanName是由protobuf自动生成的。所以我认为这不是一个好的做法。

  • 尝试使用 case class 替换 OntologyHumanName 类以避免直接对 OntologyHumanName 类进行序列化。我没有尝试过这种方式,不确定是否可行:

    case class OntologyHumanNameScalaCaseClass(val humanNames: OntologyHumanName)
    
  • 一个丑陋的解决方案。我只是将 protobuf 类转换为 Scala 对象。这样做不会出错,如下所示:

    import scala.collection.JavaConverters._
    
    val humanNameObj: OntologyHumanName = ...
    val families: List[String] = humamNameObj.getFamilyList.asScala  //使用这种方式替换 humanNameObj。
    
  • 希望能解决您上述的问题。


    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接