如何使用Java在Spark数据集中编码可选字段?

5

我希望在使用数据集中的类的字段时,不要使用null值。我尝试使用scala的Option和java的Optional,但都失败了:

    @AllArgsConstructor // lombok
    @NoArgsConstructor  // mutable type is required in java :(
    @Data               // see https://stackoverflow.com/q/59609933/1206998
    public static class TestClass {
        String id;
        Option<Integer> optionalInt;
    }

    @Test
    public void testDatasetWithOptionField(){
        Dataset<TestClass> ds = spark.createDataset(Arrays.asList(
                new TestClass("item 1", Option.apply(1)),
                new TestClass("item .", Option.empty())
        ), Encoders.bean(TestClass.class));

        ds.collectAsList().forEach(x -> System.out.println("Found " + x));
    }

在运行时出现错误信息:File 'generated.java',第77行,第47列:无法实例化抽象类"scala.Option"


问题:有没有一种方法可以在Java中对可选字段进行编码而不使用null,在数据集中?

子问题:顺便问一下,在Scala中是否可以编码包含Option字段的case class?


注意:这用于中间数据集,即不读也不写的东西(但用于Spark内部序列化)

2个回答

4

这在Scala中实现起来相当简单。

Scala实现

import org.apache.spark.sql.{Encoders, SparkSession}

object Test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("Stack-scala")
      .master("local[2]")
      .getOrCreate()

    val ds = spark.createDataset(Seq(
      TestClass("Item 1", Some(1)),
      TestClass("Item 2", None)
    ))( Encoders.product[TestClass])

    ds.collectAsList().forEach(println)

    spark.stop()
  }

  case class TestClass(
    id: String,
    optionalInt: Option[Int] )
}

Java

Java中有各种Option类可用,但没有一个可以直接使用。

  1. java.util.Optional不可序列化
  2. scala.Option -> 可序列化但是抽象的,因此当CodeGenerator生成以下代码时,会失败!
/* 081 */         // initializejavabean(newInstance(class scala.Option))
/* 082 */         final scala.Option value_9 = false ?
/* 083 */         null : new scala.Option();  // ---> Such initialization is not possible for abstract classes
/* 084 */         scala.Option javaBean_1 = value_9;
  1. org.apache.spark.api.java.Optional -> Spark 实现的 Optional 类型,可序列化但具有私有构造函数。因此,当使用零个实际参数时,会出现错误:No applicable constructor/method found for zero actual parameters。由于这是一个 final 类,无法对其进行扩展。
/* 081 */         // initializejavabean(newInstance(class org.apache.spark.api.java.Optional))
/* 082 */         final org.apache.spark.api.java.Optional value_9 = false ?
/* 083 */         null : new org.apache.spark.api.java.Optional();
/* 084 */         org.apache.spark.api.java.Optional javaBean_1 = value_9;
/* 085 */         if (!false) {

1
你描述了问题,但是并没有提供解决方案。 - Juh_

1

一种选择是在数据类中使用普通的Java Optional,然后使用Kryo作为序列化器。

Encoder en = Encoders.kryo(TestClass.class);

Dataset<TestClass> ds = spark.createDataset(Arrays.asList(
        new TestClass("item 1", Optional.of(1)),
        new TestClass("item .", Optional.empty())
), en);

ds.collectAsList().forEach(x -> System.out.println("Found " + x));

输出:

Found TestClass(id=item 1, optionalInt=Optional[1])
Found TestClass(id=item ., optionalInt=Optional.empty)

使用Kryo存在一个缺点:该编码器以二进制格式进行编码:
ds.printSchema();
ds.show(false);

打印

root
 |-- value: binary (nullable = true)

+-------------------------------------------------------------------------------------------------------+
|value                                                                                                  |
+-------------------------------------------------------------------------------------------------------+
|[01 00 4A 61 76 61 53 74 61 72 74 65 72 24 54 65 73 74 43 6C 61 73 F3 01 01 69 74 65 6D 20 B1 01 02 02]|
|[01 00 4A 61 76 61 53 74 61 72 74 65 72 24 54 65 73 74 43 6C 61 73 F3 01 01 69 74 65 6D 20 AE 01 00]   |
+-------------------------------------------------------------------------------------------------------+

一个基于udf的解决方案可以获取使用Kryo编码的数据集的正常输出列,请参考此答案

也许有点离题,但可能找到长期解决方案的起点是查看JavaTypeInference的代码。 serializerFordeserializerFor方法由ExpressionEncoder.javaBean使用,用于创建Java bean编码器的序列化程序和反序列化程序部分。
在这个模式匹配块
typeToken.getRawType match {
   case c if c == classOf[String] => createSerializerForString(inputObject)
   case c if c == classOf[java.time.Instant] => createSerializerForJavaInstant(inputObject)
   case c if c == classOf[java.sql.Timestamp] => createSerializerForSqlTimestamp(inputObject)
   case c if c == classOf[java.time.LocalDate] => createSerializerForJavaLocalDate(inputObject)
   case c if c == classOf[java.sql.Date] => createSerializerForSqlDate(inputObject)
   [...]

这里缺少对于java.util.Optional的处理。可以在此处以及相应的反序列化方法中添加。这将允许Java bean具有类型为Optional的属性。


正如您所提到的,使用kryo的问题在于我会失去列编码,这将使得Dataset的主要用途失效(直接使用RDD会更好)。因此,我想我不会使用它。但目前来看,这仍然是最好的答案。 - Juh_
我喜欢 Spark 代码中需要打补丁的开放性。有人想提交 PR 吗?(我觉得我不会有时间 :) ?) - Juh_
我认为java.util.Optional的实现缺失了,因为它不可序列化,而且Java开发人员决定不追求它,因为这意味着更多的维护。正是基于这个非常特殊的原因,Spark添加了一个额外的实现作为org.apache.spark.api.java.Optional - Amit Singh
1
好的,就像你的回答所示,使用org.apache.spark.api.java.Optional也无法解决问题。此外,这里的目标是将序列化的null值替换为Optional.empty,反之亦然,将序列化的Optional.empty替换为null或存储文件中的等效值。不要存储Optional的序列化。 - Juh_

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接