尝试将数据帧行映射到更新行时出现编码器错误。

43

当我尝试在我的代码中执行以下操作时

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

我从这里引用了上面的参考资料:Scala: How can I replace value in Dataframs using scala 但是我遇到了编码器错误:

无法找到存储在数据集中的类型的编码器。 基本类型(Int、String等)和产品类型(case类)通过导入spark implicits._ 支持序列化其他类型的支持将在未来版本中添加。

注意:我正在使用Spark 2.0!


5
你需要导入 spark.implicits._ - Yuval Itzchakov
6
谢谢@Yuval,但是没有起作用。 - Advika
4个回答

89

这里没有什么意外。您试图使用用Spark 1.x编写的代码,该代码在Spark 2.0中不再受支持:

  • 在1.x中,DataFrame.map((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
  • 在2.x中,Dataset[Row].map((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

说实话,在1.x中它也没有太多意义。无论哪个版本,您都可以简单地使用 DataFrame API:

import org.apache.spark.sql.functions.{when, lower}

val df = Seq(
  (2012, "Tesla", "S"), (1997, "Ford", "E350"),
  (2015, "Chevy", "Volt")
).toDF("year", "make", "model")

df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))

如果你真的想使用map,那么你应该使用静态类型的Dataset

import spark.implicits._

case class Record(year: Int, make: String, model: String)

df.as[Record].map {
  case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
  case rec => rec
}

或者至少返回一个具有隐式编码器的对象:

df.map {
  case Row(year: Int, make: String, model: String) => 
    (year, if(make.toLowerCase == "tesla") "S" else make, model)
}

最后,如果因为某些非常疯狂的原因你真的想对Dataset[Row]进行映射,你必须提供所需的编码器:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
  StructField("year", IntegerType),
  StructField("make", StringType),
  StructField("model", StringType)
))

val encoder = RowEncoder(schema)

df.map {
  case Row(year, make: String, model) if make.toLowerCase == "tesla" => 
    Row(year, "S", model)
  case row => row
} (encoder)

@Advika 这可能是你能选择的最糟糕的一个(实际上,在这里处理“Row”/“Any”没有什么好的理由),但我很高兴它有所帮助。 - zero323
感谢@zero323。在进行自我评估后,我意识到我迄今为止采取的方法打败了Spark的所有目的。我需要想出更好的逻辑。 :) - Advika
@zero323 为什么在 Dataset[Row] 上进行映射是“完全疯狂”的?实际上,我有一个使用案例,需要对 Dataset[Row] 进行 flatMap。 - Jane Wayne
9
由于以下原因,如果您不使用DataFrame和二进制编码器(Encoders),则会出现问题:a)您无法获得DataFrame和二进制编码器提供的性能增益;b)您无法获得类型安全性;c)您需要显式匹配每种类型,这使得代码冗长且容易出错。d)您必须为编码器指定模式,再次冗长且容易出错。对于类似于Dataframe上的flatMap的操作,“explode”通常已经足够了。 - zero323
@zero323,就 map 本身的代码而言,我认为在隐式编码器中你需要明确匹配每种类型,而在最后一个示例中,你仅针对用于逻辑的属性指定类型。 - Dr Y Wit

5

如果数据框架模式是预先知道的情况下,@zero323给出的答案是解决方案。

但对于动态模式/或将多个数据框架传递给通用函数的情况:

在从1.6.1迁移到2.2.0时,以下代码对我们起作用。

import org.apache.spark.sql.Row

val df = Seq(
   (2012, "Tesla", "S"), (1997, "Ford", "E350"),
   (2015, "Chevy", "Volt")
 ).toDF("year", "make", "model")

val data = df.rdd.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

这段代码可以在Spark的所有版本上运行。

缺点:Spark在DataFrame/Dataset API上提供的优化将不会被应用。


3

为了更好地理解其他回答(尤其是@zero323关于在Dataset[Row]上使用map的最终观点),还有一些其他重要的知识点需要了解:

  • 首先,Dataframe.map会给你一个Dataset(更具体地说,是Dataset[T],而不是Dataset[Row])!
  • Dataset[T]总是需要编码器,这就是这句话“Dataset[Row].map is ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]”的意思。
  • 实际上,Spark已经预定义了很多编码器(可以通过执行import spark.implicits._import),但这个列表仍然无法涵盖开发人员可能创建的许多领域特定类型,在这种情况下,您需要自己创建编码器
  • 在此页面上的具体示例中,df.mapDataset返回一个Row类型,稍等一会儿,Row类型不在Spark预定义的具有编码器的类型列表中,因此您需要自己创建一个。
  • 我承认,为Row类型创建编码器与上面的链接所描述的方法有些不同,您必须使用RowEncoder,它以StructType作为参数描述行类型,就像@zero323以上提供的那样:
// this describes the internal type of a row
val schema = StructType(Seq(StructField("year", IntegerType), StructField("make", StringType), StructField("model", StringType)))

// and this completes the creation of encoder
// for the type `Row` with internal schema described above
val encoder = RowEncoder(schema)

0
在我的Spark 2.4.4版本中,我必须导入隐式转换。这是一个通用的答案。
val spark2 = spark
import spark2.implicits._

val data = df.rdd.map(row => my_func(row))

在 my_func 函数执行了一些操作。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接