我是新手,对于Spark和Scala并不熟悉。
我们有一个外部数据源提供JSON。在这个JSON中,所有值,包括数字和布尔字段都有引号。因此,当我将其加载到DataFrame中时,所有列都变成了字符串。最终的目标是将这些JSON记录转换为适当类型的Parquet文件。
大约有100个字段,我需要将其中几个类型从字符串更改为int、boolean或bigint(long)。此外,我们处理的每个DataFrame只会有一部分这些字段,而不是全部。因此,我需要能够处理给定DataFrame的子集列,并将每个列与已知的列类型列表进行比较,根据出现在DataFrame中的列将某些列从字符串转换为int、bigint和boolean。
最后,我需要可配置的列类型列表,因为未来我们将有新的列,并可能要删除或更改旧的列。
接下来是我的进展:
我们有一个外部数据源提供JSON。在这个JSON中,所有值,包括数字和布尔字段都有引号。因此,当我将其加载到DataFrame中时,所有列都变成了字符串。最终的目标是将这些JSON记录转换为适当类型的Parquet文件。
大约有100个字段,我需要将其中几个类型从字符串更改为int、boolean或bigint(long)。此外,我们处理的每个DataFrame只会有一部分这些字段,而不是全部。因此,我需要能够处理给定DataFrame的子集列,并将每个列与已知的列类型列表进行比较,根据出现在DataFrame中的列将某些列从字符串转换为int、bigint和boolean。
最后,我需要可配置的列类型列表,因为未来我们将有新的列,并可能要删除或更改旧的列。
接下来是我的进展:
// first I convert to all lower case for column names
val df = dfIn.toDF(dfIn.columns map(_.toLowerCase): _*)
// Big mapping to change types
// TODO how would I make this configurable?
// I'd like to drive this list from an external config file.
val dfOut = df.select(
df.columns.map {
///// Boolean
case a @ "a" => df(a).cast(BooleanType).as(a)
case b @ "b" => df(b).cast(BooleanType).as(b)
///// Integer
case i @ "i" => df(i).cast(IntegerType).as(i)
case j @ "j" => df(j).cast(IntegerType).as(j)
// Bigint to Double
case x @ "x" => df(x).cast(DoubleType).as(x)
case y @ "y" => df(y).cast(DoubleType).as(y)
case other => df(other)
}: _*
)
这是一种高效的将数据转换为我在Scala中想要的类型的方法吗?
我需要一些建议,如何从外部的“配置”文件驱动它,我可以在其中定义列类型。