如何在数据集中使用java.time.LocalDate(失败并显示java.lang.UnsupportedOperationException:找不到编码器)?

17
  • Spark 2.1.1
  • Scala 2.11.8
  • Java 8
  • Linux Ubuntu 16.04 LTS

我想把我的RDD转换成Dataset。为此,我使用implicits方法toDS(),但是出现了以下错误:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "date")
- root class: "observatory.TemperatureRow"
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)

在我的情况下,我必须使用类型java.time.LocalDate,不能使用java.sql.data。我已经阅读到需要通知Spark如何将Java类型转换为Sql类型,为此,我构建了以下两个隐式函数:

implicit def toSerialized(t: TemperatureRow): EncodedTemperatureRow = EncodedTemperatureRow(t.date.toString, t.location, t.temperature)
implicit def fromSerialized(t: EncodedTemperatureRow): TemperatureRow = TemperatureRow(LocalDate.parse(t.date), t.location, t.temperature)

以下是关于我的应用程序的一些代码:
case class Location(lat: Double, lon: Double)

case class TemperatureRow(
                             date: LocalDate,
                             location: Location,
                             temperature: Double
                         )

case class EncodedTemperatureRow(
                             date: String,
                             location: Location,
                             temperature: Double

val s = Seq[TemperatureRow](
                    TemperatureRow(LocalDate.parse("2017-01-01"), Location(1.4,5.1), 4.9),
                    TemperatureRow(LocalDate.parse("2014-04-05"), Location(1.5,2.5), 5.5)
                )

import spark.implicits._
val temps: RDD[TemperatureRow] = sc.parallelize(s)
val tempsDS = temps.toDS

我不知道为什么Spark要搜索一个java.time.LocalDate的编码器,我提供了TemperatureRowEncodedTemperatureRow的隐式转换...


1
你提供的隐式转换对于Spark来说是毫无用处的 - 框架如何“知道”将对象转换为EncodedTemperatureRow?Spark需要类型为org.apache.spark.sql.Encoder[T]的隐式值来编码类型为T的值,因此您需要提供一个隐式的Encoder[TemperatureRow]。创建这样的编码器并不容易,请参见https://dev59.com/FloV5IYBdhLWcg3wIb74#39442829。 - Tzach Zohar
1个回答

16

java.time.LocalDate在Spark 2.2及以下版本中不受支持(我曾经试图为该类型编写一个Encoder,但是未能成功(失败了))。

你需要将java.time.LocalDate转换成一些其他的受支持类型(例如java.sql.Timestamp或者java.sql.Date),或者转换成字符串表示的日期时间(如epoch或者date-time)。


3
为了澄清任何来到这里的人:即使是Spark的2.2版本也无法处理JDK8的日期/时间类。在https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala中的转换仅查看JDK8之前的日期/时间类。 - wishihadabettername
那么你怎么做呢?即编码请。 - CpILL
你只需使用 mapwithColumn 将其映射到其他类型即可避免此问题。 - Jacek Laskowski
2
看起来Spark 3将支持它。请参见https://issues.apache.org/jira/browse/SPARK-27222和https://github.com/apache/spark/commit/0f4f8160e6d01d2e263adcf39d53bd0a03fc1b73#diff-f52e4a77ff9291d86359d609a9757781。 - dvir
为什么it失败了?有任何评论吗? - jack

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接