大家好,
是否有一种优雅且被接受的方法来展开一个具有嵌套 StructType
列的Spark SQL表(Parquet)?
例如
如果我的模式是:
foo
|_bar
|_baz
x
y
z
如何将其选择为扁平的表格形式,而不必手动运行
df.select("foo.bar","foo.baz","x","y","z")
换句话说,如果我只有一个StructType
和一个DataFrame
,那么该如何以编程方式获取上述代码的结果?大家好,
是否有一种优雅且被接受的方法来展开一个具有嵌套 StructType
列的Spark SQL表(Parquet)?
例如
如果我的模式是:
foo
|_bar
|_baz
x
y
z
如何将其选择为扁平的表格形式,而不必手动运行
df.select("foo.bar","foo.baz","x","y","z")
换句话说,如果我只有一个StructType
和一个DataFrame
,那么该如何以编程方式获取上述代码的结果?简短的回答是,没有“公认”的方法来做到这一点,但您可以通过递归函数非常优雅地完成此操作,该函数通过遍历DataFrame.schema
生成您的select(...)
语句。
递归函数应返回一个Array[Column]
。每次函数遇到StructType
时,它将调用自身并将返回的Array[Column]
附加到自己的Array[Column]
中。
类似于:
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.functions.col
def flattenSchema(schema: StructType): Seq[Column] = schema.fields.flatMap {
case StructField(name, inner: StructType, _, _) => allColumns(inner).map(sub => col(s"$name.$sub"))
case StructField(name, _, _, _) => Seq(col(name))
}
然后您将像这样使用它:
df.select(flattenSchema(df.schema):_*)
flattenSchema
返回的数组)中选择Foo.Bar和Foz.Bar时,我得到了两列都命名为Bar。但我想要列标题为Foo.Bar
或Foo_Bar
或类似的东西。因此,每个列都将是唯一且明确的。 - V. Sammaf1.nested1.nested2 ...
,你应该在这一行别名列:case _ => Array(col(colName))
应该变成 case _ => Array(col(colName).alias(colName))
。 - b2Wc0EKKOvLPn我想分享一下我针对Pyspark提出的解决方案——它基本上是@David Griffin方案的翻译,因此支持任何级别的嵌套对象。
from pyspark.sql.types import StructType, ArrayType
def flatten(schema, prefix=None):
fields = []
for field in schema.fields:
name = prefix + '.' + field.name if prefix else field.name
dtype = field.dataType
if isinstance(dtype, ArrayType):
dtype = dtype.elementType
if isinstance(dtype, StructType):
fields += flatten(dtype, prefix=name)
else:
fields.append(name)
return fields
df.select(flatten(df.schema)).show()
item
.productOrService
.coding
['code']':参数2需要整数类型,但'code'是字符串类型。”有任何想法吗?我对JSON完全不熟悉,但我会怀疑结构体中的数组存在问题。 - user1983682df.select("Parent.Child")
,它将返回一个带有子列值的DataFrame,并命名为Child。但是,如果你有不同父结构的相同属性名称,你会丢失关于父结构的信息,并且可能最终得到具有相同列名称的列,无法通过名称再访问它们,因为它们是不明确的。flattenSchema
:val flattenedSchema = flattenSchema(df.schema)
原本返回了一个由Column对象组成的数组。但是如果在select()
中使用它,会返回一个以最后一级子项命名的DataFrame。我将原始列名称映射为字符串,然后在选择Parent.Child
列之后,重新命名为Parent.Child
而不是Child
(也为了方便将点替换为下划线):
val renamedCols = flattenedSchema.map(name => col(name.toString()).as(name.toString().replace(".","_")))
接着,您可以像原回答中所示使用select函数:
var newDf = df.select(renamedCols:_*)
我为开源的spark-daria项目添加了一个DataFrame#flattenSchema
方法。
以下是如何在您的代码中使用此功能。
import com.github.mrpowers.spark.daria.sql.DataFrameExt._
df.flattenSchema().show()
+-------+-------+---------+----+---+
|foo.bar|foo.baz| x| y| z|
+-------+-------+---------+----+---+
| this| is|something|cool| ;)|
+-------+-------+---------+----+---+
您还可以使用flattenSchema()
方法指定不同的列名称分隔符。
df.flattenSchema(delimiter = "_").show()
+-------+-------+---------+----+---+
|foo_bar|foo_baz| x| y| z|
+-------+-------+---------+----+---+
| this| is|something|cool| ;)|
+-------+-------+---------+----+---+
这个分隔符参数非常重要。如果您想要将模式展平以加载到Redshift表中,您将无法使用句点作为分隔符。
这是生成此输出的完整代码片段。
val data = Seq(
Row(Row("this", "is"), "something", "cool", ";)")
)
val schema = StructType(
Seq(
StructField(
"foo",
StructType(
Seq(
StructField("bar", StringType, true),
StructField("baz", StringType, true)
)
),
true
),
StructField("x", StringType, true),
StructField("y", StringType, true),
StructField("z", StringType, true)
)
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data),
StructType(schema)
)
df.flattenSchema().show()
如果您不想将spark-daria依赖项添加到项目中,那么底层代码与David Griffin的代码类似。
object StructTypeHelpers {
def flattenSchema(schema: StructType, delimiter: String = ".", prefix: String = null): Array[Column] = {
schema.fields.flatMap(structField => {
val codeColName = if (prefix == null) structField.name else prefix + "." + structField.name
val colName = if (prefix == null) structField.name else prefix + delimiter + structField.name
structField.dataType match {
case st: StructType => flattenSchema(schema = st, delimiter = delimiter, prefix = colName)
case _ => Array(col(codeColName).alias(colName))
}
})
}
}
object DataFrameExt {
implicit class DataFrameMethods(df: DataFrame) {
def flattenSchema(delimiter: String = ".", prefix: String = null): DataFrame = {
df.select(
StructTypeHelpers.flattenSchema(df.schema, delimiter, prefix): _*
)
}
}
}
以下是更复杂模式的额外处理方式:https://medium.com/@lvhuyen/working-with-spark-dataframe-having-a-complex-schema-a3bce8c3f44
当您的字段名称包含特殊字符如点 '.',连字符 '-' 等时,PySpark 可以解决这些问题。
from pyspark.sql.types import StructType, ArrayType
def normalise_field(raw):
return raw.strip().lower() \
.replace('`', '') \
.replace('-', '_') \
.replace(' ', '_') \
.strip('_')
def flatten(schema, prefix=None):
fields = []
for field in schema.fields:
name = "%s.`%s`" % (prefix, field.name) if prefix else "`%s`" % field.name
dtype = field.dataType
if isinstance(dtype, ArrayType):
dtype = dtype.elementType
if isinstance(dtype, StructType):
fields += flatten(dtype, prefix=name)
else:
fields.append(col(name).alias(normalise_field(name)))
return fields
df.select(flatten(df.schema)).show()
@tailrec
def flattenSchema(
splitter: String,
fields: List[(StructField, String)],
acc: Seq[Column]): Seq[Column] = {
fields match {
case (field, prefix) :: tail if field.dataType.isInstanceOf[StructType] =>
val newPrefix = s"$prefix${field.name}."
val newFields = field.dataType.asInstanceOf[StructType].fields.map((_, newPrefix)).toList
flattenSchema(splitter, tail ++ newFields, acc)
case (field, prefix) :: tail =>
val colName = s"$prefix${field.name}"
val newCol = col(colName).as(colName.replace(".", splitter))
flattenSchema(splitter, tail, acc :+ newCol)
case _ => acc
}
}
def flattenDataFrame(df: DataFrame): DataFrame = {
val fields = df.schema.fields.map((_, ""))
df.select(flattenSchema("__", fields.toList, Seq.empty): _*)
}
from pyspark.sql import functions as F
def flatten_df(nested_df):
flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
flat_df = nested_df.select(flat_cols +
[F.col(nc+'.'+c).alias(nc+'_'+c)
for nc in nested_cols
for c in nested_df.select(nc+'.*').columns])
return flat_df
在此之前:
root
|-- x: string (nullable = true)
|-- y: string (nullable = true)
|-- foo: struct (nullable = true)
| |-- a: float (nullable = true)
| |-- b: float (nullable = true)
| |-- c: integer (nullable = true)
|-- bar: struct (nullable = true)
| |-- a: float (nullable = true)
| |-- b: float (nullable = true)
| |-- c: integer (nullable = true)
之后:
root
|-- x: string (nullable = true)
|-- y: string (nullable = true)
|-- foo_a: float (nullable = true)
|-- foo_b: float (nullable = true)
|-- foo_c: integer (nullable = true)
|-- bar_a: float (nullable = true)
|-- bar_b: float (nullable = true)
|-- bar_c: integer (nullable = true)
结合David Griffen和V. Samma的答案,您可以执行以下操作以展平并避免重复列名:
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrame
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)
f.dataType match {
case st: StructType => flattenSchema(st, colName)
case _ => Array(col(colName).as(colName.replace(".","_")))
}
})
}
def flattenDataFrame(df:DataFrame): DataFrame = {
df.select(flattenSchema(df.schema):_*)
}
var my_flattened_json_table = flattenDataFrame(my_json_table)
对于上面的代码,如果你正在使用嵌套结构和数组,则需要进行一些小的修改。
最初的回答:
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)
f match {
case StructField(_, struct:StructType, _, _) => flattenSchema(struct, colName)
case StructField(_, ArrayType(x :StructType, _), _, _) => flattenSchema(x, colName)
case StructField(_, ArrayType(_, _), _, _) => Array(col(colName))
case _ => Array(col(colName))
}
})
}
explode
DataFrame 方法吗? - Daniel de Paulaexplode
就可以解决问题了。explode
会创建新的行,但他想要添加列。我认为你需要使用Column
对象来处理。 - David Griffinexplode
做到这一点——explode
实际上允许您创建新列。但我认为这不太优雅—— 您可能需要为每个记录执行模式反射,而不是将模式反射预先加载以仅执行一次以创建select(...)
。 - David Griffin