优雅地自动展开Spark SQL中的DataFrame

65

大家好,

是否有一种优雅且被接受的方法来展开一个具有嵌套 StructType 列的Spark SQL表(Parquet)?

例如

如果我的模式是:

foo
 |_bar
 |_baz
x
y
z

如何将其选择为扁平的表格形式,而不必手动运行

df.select("foo.bar","foo.baz","x","y","z")
换句话说,如果我只有一个StructType和一个DataFrame,那么该如何以编程方式获取上述代码的结果?

你尝试过使用 explode DataFrame 方法吗? - Daniel de Paula
2
不要认为 explode 就可以解决问题了。explode 会创建新的行,但他想要添加列。我认为你需要使用 Column 对象来处理。 - David Griffin
抱歉,是我犯了错误。 - Daniel de Paula
我的意思是,我确定我可以用 explode 做到这一点—— explode 实际上允许您创建新列。但我认为这不太优雅—— 您可能需要为每个记录执行模式反射,而不是将模式反射预先加载以仅执行一次以创建 select(...) - David Griffin
1
直接从Databricks提供的解决方案:https://github.com/delta-io/delta/blob/f72bb4147c3555b9a0f571b35ac4d9a41590f90f/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala#L123 - Tomas Bartalos
14个回答

108

简短的回答是,没有“公认”的方法来做到这一点,但您可以通过递归函数非常优雅地完成此操作,该函数通过遍历DataFrame.schema生成您的select(...)语句。

递归函数应返回一个Array[Column]。每次函数遇到StructType时,它将调用自身并将返回的Array[Column]附加到自己的Array[Column]中。

类似于:

import org.apache.spark.sql.Column
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.functions.col

def flattenSchema(schema: StructType): Seq[Column] = schema.fields.flatMap {
  case StructField(name, inner: StructType, _, _) => allColumns(inner).map(sub => col(s"$name.$sub"))
  case StructField(name, _, _, _)                 => Seq(col(name))
}

然后您将像这样使用它:

df.select(flattenSchema(df.schema):_*)

8
使用这个解决方案,我如何处理具有相同名称的最低级子节点?例如,父元素Foo有一个名为Bar的子元素,而父元素Foz也有一个独立的名为Bar的子元素。当从初始数据帧(使用flattenSchema返回的数组)中选择Foo.Bar和Foz.Bar时,我得到了两列都命名为Bar。但我想要列标题为Foo.BarFoo_Bar或类似的东西。因此,每个列都将是唯一且明确的。 - V. Samma
1
以上解决方案适用于哪个版本的Spark?在Spark 2.1.0(Java API)中,StructField的类型似乎永远不会是StructType。 - dmux
3
TheM00s3,你需要导入org.apache.spark.sql.functions.col。在Spark 2.1.x中也可以工作(目前只在Scala中尝试过,未在Java中尝试)。 - markus
1
嗨,@David Griffin, 如果数组包含嵌套的字典,并且(不同的)字典可能嵌套一个数组,我需要如何修改此函数? - vsdaking
18
如果有其他人也遇到了这个问题:如果你希望新的列名反映原始模式的嵌套结构:f1.nested1.nested2 ...,你应该在这一行别名列:case _ => Array(col(colName)) 应该变成 case _ => Array(col(colName).alias(colName)) - b2Wc0EKKOvLPn
显示剩余9条评论

44

我想分享一下我针对Pyspark提出的解决方案——它基本上是@David Griffin方案的翻译,因此支持任何级别的嵌套对象。

from pyspark.sql.types import StructType, ArrayType  

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType

        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(name)

    return fields


df.select(flatten(df.schema)).show()

我遇到了一个错误,可能是由于嵌套的JSON模式过于复杂,但我不确定它的含义是什么:“由于数据类型不匹配,无法解析'item.productOrService.coding['code']':参数2需要整数类型,但'code'是字符串类型。”有任何想法吗?我对JSON完全不熟悉,但我会怀疑结构体中的数组存在问题。 - user1983682
@user1983682 请打开您的机箱,这样我们就可以看到您带有详细信息的“模式”。 - Cloud Cho

28
我正在改进我的先前答案,并提供一个解决方案来解决在接受的答案评论中提到的问题。
这个被接受的解决方案创建了一个Column对象数组,并使用它来选择这些列。在Spark中,如果你有一个嵌套的DataFrame,你可以像这样选择子列:df.select("Parent.Child"),它将返回一个带有子列值的DataFrame,并命名为Child。但是,如果你有不同父结构的相同属性名称,你会丢失关于父结构的信息,并且可能最终得到具有相同列名称的列,无法通过名称再访问它们,因为它们是不明确的。
这就是我的问题所在。
我找到了解决我的问题的方法,也许它能帮助别人。我单独调用了flattenSchema
val flattenedSchema = flattenSchema(df.schema)

原本返回了一个由Column对象组成的数组。但是如果在select()中使用它,会返回一个以最后一级子项命名的DataFrame。我将原始列名称映射为字符串,然后在选择Parent.Child列之后,重新命名为Parent.Child而不是Child(也为了方便将点替换为下划线):

val renamedCols = flattenedSchema.map(name => col(name.toString()).as(name.toString().replace(".","_")))

接着,您可以像原回答中所示使用select函数:

var newDf = df.select(renamedCols:_*)

谢谢@V.Samma,我已经用这个解决了我的问题,但是它创建了一个非常宽的数据框,实际上我需要将嵌套结构类型作为新行添加到我的数据框中。如果您有任何建议,将不胜感激。 - ukbaz
感谢您的回复@V.Samma。根据您的示例,我得到了以下内容:“ID”,“Person.Name”,“Person.Age”,“Address.City”,“Address.Street”,“Address.Country”,“ID1”,“Person.Name1”,“Person.Age1”,“Address.City1”,“Address.Street1”,“Address.Country1”,“ID2”,“Person.Name2”,“Person.Age2”,“Address.City2”,“Address.Street2”,“Address.Country2”...等等。我想要的是将这些新列作为数据帧中的行,因此“ID1”和“ID2”的数据将在ID列下面。谢谢。 - ukbaz
@ukbaz 我不理解这是怎么可能的。你的最初的、未压平的数据/模式是什么样子?对于我的例子,这个最初的模式和发布的解决方案一起产生了6列。每行数据仍然是单独的一行数据,但现在包含单个值而不是对象。 - V. Samma
@ukbaz 嗯,我的解决方案在这里是正确的。那将是您模式的预期输出。您将它们作为不同的结构体,不能指望将模式展平也会神奇地合并一些列并添加相应的值作为单独的行。结构体名称被定义为数字?这本身就是一个问题。但是它们各自的模式似乎不相似,因此您可以将它们合并。我建议您尝试在手头获得更好的格式化数据,以便您具有正确命名的列/结构体和清晰的概述,而不是不同结构体值中的重复日期等。 - V. Samma
1
很遗憾,我已经有几年没有使用Spark了,但看起来你已经得到了帮助 :) - V. Samma
显示剩余5条评论

5

我为开源的spark-daria项目添加了一个DataFrame#flattenSchema方法。

以下是如何在您的代码中使用此功能。

import com.github.mrpowers.spark.daria.sql.DataFrameExt._
df.flattenSchema().show()

+-------+-------+---------+----+---+
|foo.bar|foo.baz|        x|   y|  z|
+-------+-------+---------+----+---+
|   this|     is|something|cool| ;)|
+-------+-------+---------+----+---+

您还可以使用flattenSchema()方法指定不同的列名称分隔符。

df.flattenSchema(delimiter = "_").show()
+-------+-------+---------+----+---+
|foo_bar|foo_baz|        x|   y|  z|
+-------+-------+---------+----+---+
|   this|     is|something|cool| ;)|
+-------+-------+---------+----+---+

这个分隔符参数非常重要。如果您想要将模式展平以加载到Redshift表中,您将无法使用句点作为分隔符。

这是生成此输出的完整代码片段。

val data = Seq(
  Row(Row("this", "is"), "something", "cool", ";)")
)

val schema = StructType(
  Seq(
    StructField(
      "foo",
      StructType(
        Seq(
          StructField("bar", StringType, true),
          StructField("baz", StringType, true)
        )
      ),
      true
    ),
    StructField("x", StringType, true),
    StructField("y", StringType, true),
    StructField("z", StringType, true)
  )
)

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(data),
  StructType(schema)
)

df.flattenSchema().show()

如果您不想将spark-daria依赖项添加到项目中,那么底层代码与David Griffin的代码类似。

object StructTypeHelpers {

  def flattenSchema(schema: StructType, delimiter: String = ".", prefix: String = null): Array[Column] = {
    schema.fields.flatMap(structField => {
      val codeColName = if (prefix == null) structField.name else prefix + "." + structField.name
      val colName = if (prefix == null) structField.name else prefix + delimiter + structField.name

      structField.dataType match {
        case st: StructType => flattenSchema(schema = st, delimiter = delimiter, prefix = colName)
        case _ => Array(col(codeColName).alias(colName))
      }
    })
  }

}

object DataFrameExt {

  implicit class DataFrameMethods(df: DataFrame) {

    def flattenSchema(delimiter: String = ".", prefix: String = null): DataFrame = {
      df.select(
        StructTypeHelpers.flattenSchema(df.schema, delimiter, prefix): _*
      )
    }

  }

}

2
我们能否添加对Array<Struct>和Array的支持? - Sampat Kumar

4

以下是更复杂模式的额外处理方式:https://medium.com/@lvhuyen/working-with-spark-dataframe-having-a-complex-schema-a3bce8c3f44

当您的字段名称包含特殊字符如点 '.',连字符 '-' 等时,PySpark 可以解决这些问题。

from pyspark.sql.types import StructType, ArrayType  

def normalise_field(raw):
    return raw.strip().lower() \
            .replace('`', '') \
            .replace('-', '_') \
            .replace(' ', '_') \
            .strip('_')

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = "%s.`%s`" % (prefix, field.name) if prefix else "`%s`" % field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType
        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(col(name).alias(normalise_field(name)))

    return fields

df.select(flatten(df.schema)).show()

3
您还可以使用SQL将列选择为平面格式。
  1. 获取原始数据框架模式
  2. 浏览模式生成SQL字符串
  3. 查询原始数据帧
我在Java中进行了实现:https://gist.github.com/ebuildy/3de0e2855498e5358e4eed1a4f72ea48 (也可以使用递归方法,但我更喜欢使用SQL方式,这样您可以通过 Spark-shell 轻松测试它)。

3
这是解决方案的修改版本,但它使用了tailrec符号。最初的回答。

  @tailrec
  def flattenSchema(
      splitter: String,
      fields: List[(StructField, String)],
      acc: Seq[Column]): Seq[Column] = {
    fields match {
      case (field, prefix) :: tail if field.dataType.isInstanceOf[StructType] =>
        val newPrefix = s"$prefix${field.name}."
        val newFields = field.dataType.asInstanceOf[StructType].fields.map((_, newPrefix)).toList
        flattenSchema(splitter, tail ++ newFields, acc)

      case (field, prefix) :: tail =>
        val colName = s"$prefix${field.name}"
        val newCol  = col(colName).as(colName.replace(".", splitter))
        flattenSchema(splitter, tail, acc :+ newCol)

      case _ => acc
    }
  }
  def flattenDataFrame(df: DataFrame): DataFrame = {
    val fields = df.schema.fields.map((_, ""))
    df.select(flattenSchema("__", fields.toList, Seq.empty): _*)
  }

这个flatten dataframe的tailrec特性对于结构类型的dataframe非常有效。您能否添加一个处理具有explode功能的数组类型的案例?这对我来说将非常有帮助。提前致谢。 - ungalVicky

3
这里有一个函数可以满足您的需求,可以处理包含具有相同名称和前缀的列的多个嵌套列:
from pyspark.sql import functions as F

def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

在此之前:

root
 |-- x: string (nullable = true)
 |-- y: string (nullable = true)
 |-- foo: struct (nullable = true)
 |    |-- a: float (nullable = true)
 |    |-- b: float (nullable = true)
 |    |-- c: integer (nullable = true)
 |-- bar: struct (nullable = true)
 |    |-- a: float (nullable = true)
 |    |-- b: float (nullable = true)
 |    |-- c: integer (nullable = true)

之后:

root
 |-- x: string (nullable = true)
 |-- y: string (nullable = true)
 |-- foo_a: float (nullable = true)
 |-- foo_b: float (nullable = true)
 |-- foo_c: integer (nullable = true)
 |-- bar_a: float (nullable = true)
 |-- bar_b: float (nullable = true)
 |-- bar_c: integer (nullable = true)

3

结合David Griffen和V. Samma的答案,您可以执行以下操作以展平并避免重复列名:

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrame

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)
    f.dataType match {
      case st: StructType => flattenSchema(st, colName)
      case _ => Array(col(colName).as(colName.replace(".","_")))
    }
  })
}

def flattenDataFrame(df:DataFrame): DataFrame = {
    df.select(flattenSchema(df.schema):_*)
}

var my_flattened_json_table = flattenDataFrame(my_json_table)

2

对于上面的代码,如果你正在使用嵌套结构和数组,则需要进行一些小的修改。

最初的回答:

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
    schema.fields.flatMap(f => {
      val colName = if (prefix == null) f.name else (prefix + "." + f.name)

      f match {
        case StructField(_, struct:StructType, _, _) => flattenSchema(struct, colName)
        case StructField(_, ArrayType(x :StructType, _), _, _) => flattenSchema(x, colName)
        case StructField(_, ArrayType(_, _), _, _) => Array(col(colName))
        case _ => Array(col(colName))
      }
    })
  }


我正在尝试将Evan V提供的Spark建议中的逻辑实现到代码中,但似乎无法正确处理数组类型中的结构体--如果有人有想法,我将不胜感激。 - user1983682
我们可以在展平模式时添加扫描深度吗? - Sampat Kumar
我正在尝试使用它,但它没有给出正确的输入。我有一个a、a.b、a.b.c、a.b.d,但它没有对最后一级子节点进行展平。 - NickyPatel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接