- Spark 2.1.1
- Scala 2.11.8
- Java 8
- Linux Ubuntu 16.04 LTS
我想把我的RDD转换成Dataset。为此,我使用implicits
方法toDS()
,但是出现了以下错误:
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "date")
- root class: "observatory.TemperatureRow"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
在我的情况下,我必须使用类型java.time.LocalDate
,不能使用java.sql.data
。我已经阅读到需要通知Spark如何将Java类型转换为Sql类型,为此,我构建了以下两个隐式函数:
implicit def toSerialized(t: TemperatureRow): EncodedTemperatureRow = EncodedTemperatureRow(t.date.toString, t.location, t.temperature)
implicit def fromSerialized(t: EncodedTemperatureRow): TemperatureRow = TemperatureRow(LocalDate.parse(t.date), t.location, t.temperature)
以下是关于我的应用程序的一些代码:
case class Location(lat: Double, lon: Double)
case class TemperatureRow(
date: LocalDate,
location: Location,
temperature: Double
)
case class EncodedTemperatureRow(
date: String,
location: Location,
temperature: Double
val s = Seq[TemperatureRow](
TemperatureRow(LocalDate.parse("2017-01-01"), Location(1.4,5.1), 4.9),
TemperatureRow(LocalDate.parse("2014-04-05"), Location(1.5,2.5), 5.5)
)
import spark.implicits._
val temps: RDD[TemperatureRow] = sc.parallelize(s)
val tempsDS = temps.toDS
我不知道为什么Spark要搜索一个java.time.LocalDate
的编码器,我提供了TemperatureRow
和EncodedTemperatureRow
的隐式转换...
EncodedTemperatureRow
?Spark需要类型为org.apache.spark.sql.Encoder[T]
的隐式值来编码类型为T
的值,因此您需要提供一个隐式的Encoder[TemperatureRow]
。创建这样的编码器并不容易,请参见https://dev59.com/FloV5IYBdhLWcg3wIb74#39442829。 - Tzach Zohar