这里没有什么意外。您试图使用用Spark 1.x编写的代码,该代码在Spark 2.0中不再受支持:
- 在1.x中,
DataFrame.map
是 ((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
- 在2.x中,
Dataset[Row].map
是 ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]
说实话,在1.x中它也没有太多意义。无论哪个版本,您都可以简单地使用 DataFrame
API:
import org.apache.spark.sql.functions.{when, lower}
val df = Seq(
(2012, "Tesla", "S"), (1997, "Ford", "E350"),
(2015, "Chevy", "Volt")
).toDF("year", "make", "model")
df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))
如果你真的想使用map
,那么你应该使用静态类型的Dataset
:
import spark.implicits._
case class Record(year: Int, make: String, model: String)
df.as[Record].map {
case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
case rec => rec
}
或者至少返回一个具有隐式编码器的对象:
df.map {
case Row(year: Int, make: String, model: String) =>
(year, if(make.toLowerCase == "tesla") "S" else make, model)
}
最后,如果因为某些非常疯狂的原因你真的想对Dataset[Row]
进行映射,你必须提供所需的编码器:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val schema = StructType(Seq(
StructField("year", IntegerType),
StructField("make", StringType),
StructField("model", StringType)
))
val encoder = RowEncoder(schema)
df.map {
case Row(year, make: String, model) if make.toLowerCase == "tesla" =>
Row(year, "S", model)
case row => row
} (encoder)
spark.implicits._
。 - Yuval Itzchakov