Kryo: 反序列化旧版本的类

7
我需要通过添加两个新参数来修改一个类。这个类是用Kryo序列化的。 我目前将与此类相关的信息作为RDD之一持久化,每次停止流时都会这样做。 当我重新启动流时,我加载之前持久化的信息,并使用它们以保持停止和重新启动时的一致性。
由于我持久化的类需要这些新参数,所以我通过添加新的kryo.writeObject(output, object, ObjectSerializer)和kryo.readObject(input, classOf[Object], ObjectSerializer)来更改类和序列化程序以支持新参数。
现在,每当我重新启动流时,都会出现异常:“遇到未注册的类...”。
这似乎很明显,因为我尝试反序列化一个不包含在我停止流时持久化的信息中的对象。 如果我删除这些数据并像没有任何先前运行一样启动流,则不会发生异常。
有没有办法避免这种异常? 也许可以通过在缺少这些参数时指定一些默认值来实现?
我发现了一些有用的东西,以前没有看到过:Kryo issue 194
这个人通过简单地插入一个长整型来实现版本控制,以确定应该使用哪个版本的反序列化器。虽然这是一个简单的解决方案,但由于编写我正在工作的代码的公司没有考虑向前兼容性,我想我不得不放弃之前持久化的所有数据。
请告诉我是否有人能提供更好的解决方案。
编辑2:
仍然遇到这种情况的问题。 我尝试使用CompatibleFieldSerializer,如此描述:CompatibleFieldSerializer Example 因此,注册这个序列化程序而不是先前使用的自定义序列化程序。 结果是,现在重新加载持久化数据时,它会产生java.lang.NullPointerException。 如果没有先前持久化的数据,则仍然没有问题。我可以启动我的流,序列化新数据,停止流,反序列化并重新启动我的流。 仍然没有解决方法。
2个回答

5
几个月前,我们找到了这个问题的解决方案。因此,我想尽快发布一个答案来回答这个问题。 问题在于代码中存在错误,导致类使用标准Kryo FieldSerializer进行序列化,这不是向前兼容的。 我们必须执行以下操作才能反序列化旧类并将其转换为新的序列化类。
情况如下:
case class ClassA(field1 : Long, field2 : String)

它被序列化为这样:

object ClassASerializer extends Serializer[ClassA] with Serializable{
  override def write(kryo: Kryo, output: Output, t: ClassA) = {
      output.writeLong    { t.field1 }
      output.writeString  { t.field2 }
 }
  override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
       classA( 
           field1 = input.readLong(),
           field2 = input.readLong()
       )

为了使用序列化器对一系列类进行序列化,需要循环遍历包含这些类的 Seq,并注册每个类的序列化器。

    protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ...
    final def register(kryo: Kryo) = {
         registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) }
    }

需要修改的类需要添加一个新字段,该字段是另一个案例类的实例。

为了进行这样的更改,我们必须使用与Kryo库“optional”相关的注释。

...
import com.esotericsoftware.kryo.serializers.FieldSerializer.Optional
import scala.annotation.meta.field
...

case class ClassA(field1 : Long, field2 : String,  @(Optional @field)("field3") field3 : ClassB)

序列化程序已进行了修改,以便在读取旧的序列化类时,它可以使用默认值实例化 field3,并在写入时写入该默认值:
object ClassASerializer extends Serializer[ClassA] with Serializable{
  override def write(kryo: Kryo, output: Output, t: ClassA) = {
      output.writeLong    { t.field1 }
      output.writeString  { t.field2 }
      kryo.writeObject(output, Option { t.field3 } getOrElse ClassB.default, ClassBSerializer)

 }
  override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
       ClassA( 
           field1 = input.readLong(),
           field2 = input.readLong(),
           field3 = ClassB.default
       )

Kryo序列化注册也被修改为注册可选字段:
    protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ...
    def optionals = Seq("field3")

    final def register(kryo: Kryo) = {
        optionals.foreach { optional =>
        kryo.getContext.asInstanceOf[ObjectMap[Any, Any]].put(optional, true) }
        registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) }
    }

作为结果,我们成功地编写了新版本的序列化类。接着,我们需要删除可选注释、修改序列化器以从新的序列化类中读取真实字段,并将可选序列化注册表移除并添加到 Seq 注册表中。
同时,我们还纠正了代码中强制使用 FieldSerializer 进行序列化的错误,但这不在问题的范围内。

0

我猜你遇到了这些麻烦,是因为现有数据使用了FieldSerializer进行序列化,这会阻止你在保持向后兼容性的同时添加新字段。

如果是这种情况,那么我发现一个有用的方法:

  1. 将所有当前数据从使用FieldSerializer进行序列化迁移到使用VersionFieldSerializer进行序列化,可以使用以下类似的方法。在此过程中,你需要在两个使用的序列化程序中注册子序列化程序。
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.serializers.VersionFieldSerializer;

public class FieldToVersionFieldMigrationSerializer<T> extends VersionFieldSerializer<T> {

  private final FieldSerializer<T> fieldSerializer;

  public FieldToVersionFieldMigrationSerializer(Kryo kryo, Class<T> type, FieldSerializer<T> fieldSerializer) {
    super(kryo, type);
    this.fieldSerializer = fieldSerializer;
  }

  @Override
  public T read(Kryo kryo, Input input, Class<T> type) {
    try {
      return super.read(kryo, input, type);
    } catch (Exception e) {
      return fieldSerializer.read(kryo, input, type);
    }
  }
}

一旦您的所有数据都迁移到支持VersionFieldSerializer,您可以引入新字段,并使用@Since进行注释以支持向后兼容性。
另外,如果您已经解决了这个问题,请告诉我们您是如何做到的。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接