Spark默认的空列DataSet

4

我无法让Spark将json(或csv)作为一个带有Option[_]字段的case类的Dataset读取,其中并非所有字段在源中都被定义。

有点晦涩,但是假设我有一个名为CustomData的case类。

给定以下json文件(customA.json):

{"id":123, "colA": "x", "colB": "z"}
{"id":456, "colA": "y"}
{"id":789,              "colB": "a"}

以下是代码:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .master("local[2]")
  .appName("test")
  .getOrCreate()

import spark.implicits._

case class CustomData(id: BigInt, colA: Option[String], colB: Option[String])
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

val ds = spark
  .read
  .option("mode", "PERMISSIVE")
  .json("src/main/resources/customA.json")
  .as[CustomData]
  .show()

输出结果如预期一样:
+----+----+---+
|colA|colB| id|
+----+----+---+
|   x|   z|123|
|   y|null|456|
|null|   a|789|
+----+----+---+

虽然不是所有的列都必须被定义。 但是如果我想使用相同的代码来读取一个没有某个列出现的文件,我无法实现这一点:

对于另一个json文件(customB.json):

{"id":321, "colA": "x"}
{"id":654, "colA": "y"}
{"id":987}

还有额外的代码:

  val ds2 = spark
  .read
  .option("mode", "PERMISSIVE")
  .json("src/main/resources/customB.json")
  .as[CustomData]
  .show()

输出结果为错误信息:
org.apache.spark.sql.AnalysisException: 给定输入列[colA, id]无法解析'colB';
这很有道理,但我希望能够重复使用同一种情况类来处理两个文件。特别是如果在摄取之前我不知道colB是否出现在JSON文件中。
当然,我可以进行检查,但是否有一种方法将不存在的列转换为null(与customA.json一样)?将readmode设置为Permissive似乎没有改变任何内容。
我错过了什么吗?
2个回答

4
我将在这里提供一个答案。为了展示一些(有点)可行但我认为看起来很hacky的东西,我们可以通过扩展DataFrame的方法,在已存在的StructType之上强制使用case类的StructType来实现它,但是可能(我真的希望)有更好/更清晰的解决方案。
以下是代码:
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.ScalaReflection
import scala.reflect.runtime.universe._

case class DataFrameExtended(dataFrame: DataFrame) {

  def forceMergeSchema[T: TypeTag]: DataFrame = {
    ScalaReflection
      .schemaFor[T]
      .dataType
      .asInstanceOf[StructType]
      .filterNot(
        field => dataFrame.columns.contains(field.name)
      )
      .foldLeft(dataFrame){
        case (newDf, field) => newDf.withColumn(field.name, lit(null).cast(field.dataType))
      }
  }
}

implicit def dataFrameExtended(df: DataFrame): DataFrameExtended = {
  DataFrameExtended(df)
}

val ds2 = spark
  .read
  .option("mode", "PERMISSIVE")
  .json("src/main/resources/customB.json")
  .forceMergeSchema[CustomData]
  .as[CustomData]
  .show()

现在展示我期望的结果:
+----+---+----+
|colA| id|colB|
+----+---+----+
|   x|321|null|
|   y|654|null|
|null|987|null|
+----+---+----+

我只试过用标量类型(像Int、String等)来做到这一点,我认为更复杂的结构会失败得很惨。所以我还在寻找更好的答案。


1
你有没有找到更好的解决方案?我也处于类似的情况。 - Yeikel
并不是非常完美,但这个解决方案效果还不错。我使用了单个 map/select 代替 foldLeft/withColumn,切换到了 Encoders.product[T].schema 而不是 ScalaReflection.schemaFor[T],并将 dataFrame.columns 移到了一个惰性变量上以避免重新评估。 - Tom Lous

1
这是一个更简单的解决方案:

    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.catalyst.ScalaReflection
    import scala.reflect.runtime.universe._

val structSchema = ScalaReflection.schemaFor[CustomData].dataType.asInstanceOf[StructType]
val df = spark.read.schema(structSchema).json(jsonRDD)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接