这取决于列的类型。让我们从一些虚拟数据开始:
import org.apache.spark.sql.functions.{udf, lit}
import scala.util.Try
case class SubRecord(x: Int)
case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
case class Record(
an_array: Array[Int], a_map: Map[String, String],
a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])
val df = sc.parallelize(Seq(
Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
Array(
ArrayElement("foo", 1, Array(1.0, 2.0, 2.0)),
ArrayElement("bar", 2, Array(3.0, 4.0, 5.0)))),
Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
Array(ArrayElement("foz", 3, Array(5.0, 6.0)),
ArrayElement("baz", 4, Array(7.0, 8.0))))
)).toDF
df.registerTempTable("df")
df.printSchema
数组(ArrayType
)列:
Column.getItem
方法
df.select($"an_array".getItem(1)).show
// +-----------+
// |an_array[1]|
// +-----------+
// | 2|
// | 5|
// +-----------+
Hive括号语法:
sqlContext.sql("SELECT an_array[1] FROM df").show
// +---+
// |_c0|
// +---+
// | 2|
// | 5|
// +---+
一个UDF。val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption)
df.select(get_ith($"an_array", lit(1))).show
// +---------------+
// |UDF(an_array,1)|
// +---------------+
// | 2|
// | 5|
// +---------------+
此外,除了上述列出的方法之外,Spark还支持对复合类型进行操作的日益增长的内置函数列表。值得注意的示例包括高阶函数,如transform
(SQL 2.4+、Scala 3.0+、PySpark/SparkR 3.1+)。df.selectExpr("transform(an_array, x -> x + 1) an_array_inc").show
import org.apache.spark.sql.functions.transform
df.select(transform($"an_array", x => x + 1) as "an_array_inc").show
筛选器
(SQL 2.4+,Scala 3.0+,Python / SparkR 3.1+)
df.selectExpr("filter(an_array, x -> x % 2 == 0) an_array_even").show
import org.apache.spark.sql.functions.filter
df.select(filter($"an_array", x => x % 2 === 0) as "an_array_even").show
aggregate
(SQL 2.4+,Scala 3.0+,PySpark / SparkR 3.1+):
df.selectExpr("aggregate(an_array, 0, (acc, x) -> acc + x, acc -> acc) an_array_sum").show
// +------------+
// |an_array_sum|
// +------------+
// | 6|
// | 15|
// +------------+
import org.apache.spark.sql.functions.aggregate
df.select(aggregate($"an_array", lit(0), (x, y) => x + y) as "an_array_sum").show
// +------------+
// |an_array_sum|
// +------------+
// | 6|
// | 15|
// +------------+
数组处理函数(array_*
),例如 array_distinct
(2.4+):
import org.apache.spark.sql.functions.array_distinct
df.select(array_distinct($"an_array_of_structs.vals"(0))).show
array_max
(array_min
, 2.4+):
import org.apache.spark.sql.functions.array_max
df.select(array_max($"an_array")).show
// +-------------------+
// |array_max(an_array)|
// +-------------------+
// | 3|
// | 6|
// +-------------------+
flatten
(2.4+)
import org.apache.spark.sql.functions.flatten
df.select(flatten($"an_array_of_structs.vals")).show
arrays_zip
(2.4+):
import org.apache.spark.sql.functions.arrays_zip
df.select(arrays_zip($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show(false)
array_union
(2.4+):
import org.apache.spark.sql.functions.array_union
df.select(array_union($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show
slice
(2.4+):
import org.apache.spark.sql.functions.slice
df.select(slice($"an_array", 2, 2)).show
映射(MapType
)列
df.select($"a_map".getField("foo")).show
使用Hive括号语法:
sqlContext.sql("SELECT a_map['foz'] FROM df").show
// +
// | _c0|
// +
// |null|
// | baz|
// +
使用点语法和完整路径:
df.select($"a_map.foo").show
使用用户自定义函数(UDF)
val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k))
df.select(get_field($"a_map", lit("foo"))).show
不断增加的
map_*
函数,如
map_keys
(2.3+)
import org.apache.spark.sql.functions.map_keys
df.select(map_keys($"a_map")).show
// +---------------+
// |map_keys(a_map)|
// +---------------+
// | [foo]|
// | [foz]|
// +---------------+
或者 map_values
(2.3+)
import org.apache.spark.sql.functions.map_values
df.select(map_values($"a_map")).show
// +-----------------+
// |map_values(a_map)|
// +-----------------+
// | [bar]|
// | [baz]|
// +-----------------+
请查看SPARK-23899获取详细列表。
使用点语法的完整路径来操作结构体 (StructType
) 列:
df.select($"a_struct.x").show
使用原始的SQL语句
sqlContext.sql("SELECT a_struct.x FROM df").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+
可以使用点语法、名称和标准的Column
方法访问structs
数组内的字段:
df.select($"an_array_of_structs.foo").show
// +----------+
// | foo|
// +----------+
// |[foo, bar]|
// |[foz, baz]|
// +----------+
sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
// +---+
// |_c0|
// +---+
// |foo|
// |foz|
// +---+
df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show
// +------------------------------+
// |an_array_of_structs.vals[1][1]|
// +------------------------------+
// | 4.0|
// | 8.0|
// +------------------------------+
用户定义类型(UDT)字段可以通过UDF进行访问。有关详细信息,请参见Spark SQL referencing attributes of UDT。
注意事项:
df.select(explode($"an_array_of_structs")).show
使用通配符(*
)可以与点语法结合使用,选择(可能多个)字段而不需要明确指定名称:
df.select($"a_struct.*").show
使用get_json_object
和from_json
函数可以查询JSON列。有关详细信息,请参见如何使用Spark DataFrames查询JSON数据列?
map[hello]
不起作用的原因是键是字符串字段,因此必须加引号:map['hello']
。 - Tristan Reid