如何向DataFrame添加一个新的结构列

25

我目前正在尝试从MongoDB中提取数据库,并使用Spark将其导入到ElasticSearch中,并使用geo_points

Mongo数据库具有纬度和经度值,但ElasticSearch要求将它们转换为geo_point 类型。

在Spark中是否有一种方法可以将latlon 列复制到一个新列中,该列是一个arraystruct

感谢任何帮助!

2个回答

63

我假设您开始使用类似于以下的平面模式:

root
 |-- lat: double (nullable = false)
 |-- long: double (nullable = false)
 |-- key: string (nullable = false)

首先让我们创建示例数据:

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types._

val rdd = sc.parallelize(
    Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)

val schema = StructType(
    StructField("lat", DoubleType, false) ::
    StructField("long", DoubleType, false) ::
    StructField("key", StringType, false) ::Nil)

val df = sqlContext.createDataFrame(rdd, schema)

一种简单方法是使用 UDF 和 case class:

case class Location(lat: Double, long: Double)
val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))

val dfRes = df.
   withColumn("location", makeLocation(col("lat"), col("long"))).
   drop("lat").
   drop("long")

dfRes.printSchema

然后我们得到

root
 |-- key: string (nullable = false)
 |-- location: struct (nullable = true)
 |    |-- lat: double (nullable = false)
 |    |-- long: double (nullable = false)

一种较为困难的方法是先转换您的数据,然后再应用模式:

val rddRes = df.
    map{case Row(lat, long, key) => Row(key, Row(lat, long))}

val schemaRes = StructType(
    StructField("key", StringType, false) ::
    StructField("location", StructType(
        StructField("lat", DoubleType, false) ::
        StructField("long", DoubleType, false) :: Nil
    ), true) :: Nil 
)

sqlContext.createDataFrame(rddRes, schemaRes).show

然后我们得到了预期的输出

+------+-------------+
|   key|     location|
+------+-------------+
|Warsaw|[52.23,21.01]|
| Corte|  [42.3,9.15]|
+------+-------------+

从头开始创建嵌套模式可能很繁琐,因此如果可能的话,我建议采用第一种方法。如果需要更复杂的结构,它可以轻松扩展:

case class Pin(location: Location)
val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))

df.
    withColumn("pin", makePin(col("lat"), col("long"))).
    drop("lat").
    drop("long").
    printSchema

然后我们会得到预期的输出:

root
 |-- key: string (nullable = false)
 |-- pin: struct (nullable = true)
 |    |-- location: struct (nullable = true)
 |    |    |-- lat: double (nullable = false)
 |    |    |-- long: double (nullable = false)

很遗憾,您无法控制nullable字段,因此如果它对您的项目很重要,您将不得不指定模式。

最后,您可以使用在1.4中引入的struct函数:

import org.apache.spark.sql.functions.struct

df.select($"key", struct($"lat", $"long").alias("location"))

感谢 @zero323 提供如此详尽的答案!这非常有帮助。您知道我怎么能够递归地为嵌套类型进行映射吗?这个数据比我想象的要难看。 - Kim Ngo
我看不出来你为什么不能。 - zero323
嗨@zero323 - 你知道如果新结构体中有超过10列,是否有任何方法可以使用你的UDF方法创建结构体吗?UDF似乎在10个输入变量上有限制。 - Patrick McGloin
@PatrickMcGloin 这个链接 https://dev59.com/pprga4cB1Zd3GeqPrsE- 有帮助吗? - zero323
1
嗨@zero323。其实你上次提供的“struct”函数建议对我很有帮助。我应该先读完最后面! - Patrick McGloin
@zero323 有什么建议可以在Spark中解决这个问题吗... https://stackoverflow.com/questions/62933135/dataframe-look-up-and-optimization - BdEngineer

7

试试这个:

import org.apache.spark.sql.functions._

df.registerTempTable("dt")

dfres = sql("select struct(lat,lon) as colName from dt")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接