我目前正在尝试从MongoDB中提取数据库,并使用Spark将其导入到ElasticSearch中,并使用geo_points
。
Mongo数据库具有纬度和经度值,但ElasticSearch要求将它们转换为geo_point
类型。
在Spark中是否有一种方法可以将lat
和lon
列复制到一个新列中,该列是一个array
或struct
?
感谢任何帮助!
我目前正在尝试从MongoDB中提取数据库,并使用Spark将其导入到ElasticSearch中,并使用geo_points
。
Mongo数据库具有纬度和经度值,但ElasticSearch要求将它们转换为geo_point
类型。
在Spark中是否有一种方法可以将lat
和lon
列复制到一个新列中,该列是一个array
或struct
?
感谢任何帮助!
我假设您开始使用类似于以下的平面模式:
root
|-- lat: double (nullable = false)
|-- long: double (nullable = false)
|-- key: string (nullable = false)
首先让我们创建示例数据:
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types._
val rdd = sc.parallelize(
Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)
val schema = StructType(
StructField("lat", DoubleType, false) ::
StructField("long", DoubleType, false) ::
StructField("key", StringType, false) ::Nil)
val df = sqlContext.createDataFrame(rdd, schema)
一种简单方法是使用 UDF 和 case class:
case class Location(lat: Double, long: Double)
val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))
val dfRes = df.
withColumn("location", makeLocation(col("lat"), col("long"))).
drop("lat").
drop("long")
dfRes.printSchema
然后我们得到
root
|-- key: string (nullable = false)
|-- location: struct (nullable = true)
| |-- lat: double (nullable = false)
| |-- long: double (nullable = false)
一种较为困难的方法是先转换您的数据,然后再应用模式:
val rddRes = df.
map{case Row(lat, long, key) => Row(key, Row(lat, long))}
val schemaRes = StructType(
StructField("key", StringType, false) ::
StructField("location", StructType(
StructField("lat", DoubleType, false) ::
StructField("long", DoubleType, false) :: Nil
), true) :: Nil
)
sqlContext.createDataFrame(rddRes, schemaRes).show
然后我们得到了预期的输出
+------+-------------+
| key| location|
+------+-------------+
|Warsaw|[52.23,21.01]|
| Corte| [42.3,9.15]|
+------+-------------+
从头开始创建嵌套模式可能很繁琐,因此如果可能的话,我建议采用第一种方法。如果需要更复杂的结构,它可以轻松扩展:
case class Pin(location: Location)
val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))
df.
withColumn("pin", makePin(col("lat"), col("long"))).
drop("lat").
drop("long").
printSchema
然后我们会得到预期的输出:
root
|-- key: string (nullable = false)
|-- pin: struct (nullable = true)
| |-- location: struct (nullable = true)
| | |-- lat: double (nullable = false)
| | |-- long: double (nullable = false)
很遗憾,您无法控制nullable
字段,因此如果它对您的项目很重要,您将不得不指定模式。
最后,您可以使用在1.4中引入的struct
函数:
import org.apache.spark.sql.functions.struct
df.select($"key", struct($"lat", $"long").alias("location"))
试试这个:
import org.apache.spark.sql.functions._
df.registerTempTable("dt")
dfres = sql("select struct(lat,lon) as colName from dt")