在Spark中创建DataSet时出现Scala反射异常

3
我想在Spark Jobserver上运行Spark作业。在执行过程中,我遇到了一个异常:
堆栈信息:
java.lang.RuntimeException: scala.ScalaReflectionException: JavaMirror中的类com.some.example.instrument.data.SQLMapping具有类型为class org.apache.spark.util.MutableURLClassLoader的org.apache.spark.util.MutableURLClassLoader@55b699ef,并且父级是类型为class sun.misc.Launcher$ AppClassLoader的sun.misc.Launcher$AppClassLoader,它的classpath是[file:/app/spark-job-server.jar],而classpath jars[/classpath]未找到。 在scala.reflect.internal.Mirrors $ RootsBase.staticClass(Mirrors.scala:123)处 在scala.reflect.internal.Mirrors $ RootsBase.staticClass(Mirrors.scala:22)处 在com.some.example.instrument.DataRetriever $$ anonfun $ combineMappings $ 1$$ typecreator15 $ 1.apply(DataRetriever.scala:136)处 在scala.reflect.api.TypeTags $ WeakTypeTagImpl.tpe $ lzycompute(TypeTags.scala:232)处 在scala.reflect.api.TypeTags $ WeakTypeTagImpl.tpe(TypeTags.scala:232)处 在org.apache.spark.sql.catalyst.encoders.ExpressionEncoder $ .apply(ExpressionEncoder.scala:49)处 在org.apache.spark.sql.Encoders $ .product(Encoders.scala:275)处 在org.apache.spark.sql.LowPrioritySQLImplicits $ class.newProductEncoder(SQLImplicits.scala:233)处 在org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33)处 在com.some.example.instrument.DataRetriever $$ anonfun $ combineMappings $ 1.apply(DataRetriever.scala:136)处 在com.some.example.instrument.DataRetriever $$ anonfun $ combineMappings $ 1.apply(DataRetriever.scala:135)处 在scala.util.Success $$ anonfun $ map $ 1.apply(Try.scala:237)处 在scala.util.Try $ .apply(Try.scala:192)处 在scala.util.Success.map(Try.scala:237)处 在scala.concurrent.Future $$ anonfun $ map $ 1.apply(Future.scala:237)处 在scala.concurrent.Future $$ anonfun $ map $ 1.apply(Future.scala:237)处 在scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)处 在scala.concurrent.impl.ExecutionContextImpl $ AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)处 在scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)处 在scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339)处 在scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)处 在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)处
在DataRetriever中,我将简单的case class转换为DataSet。
case class定义:
case class SQLMapping(id: String,
                      it: InstrumentPrivateKey,
                      cc: Option[String],
                      ri: Option[SourceInstrumentId],
                      p: Option[SourceInstrumentId],
                      m: Option[SourceInstrumentId])

case class SourceInstrumentId(instrumentId: Long,
                              providerId: String)

case class InstrumentPrivateKey(instrumentId: Long,
                                providerId: String,
                                clientId: String)

引起问题的代码:

import session.implicits._
def someFunc(future: Future[ID]): Dataset[SQLMappins] = {
future.map {f =>
val seq: Seq[SQLMapping] = getFromEndpoint(f)
val ds: Dataset[SQLMapping] = seq.toDS()
...
 }
}

这项工作有时会正常运行,但如果我重新运行该工作,则会抛出异常。

更新于2018年3月28日 我忘了提及一个细节,结果证明它很重要。数据集是在Future内构建的。

1个回答

1

在future中调用toDS()会导致ScalaReflectionException异常。

我决定在future.map之外构建DataSet。

您可以使用此示例作业验证无法在future.map中构建数据集。

package com.example.sparkapplications

import com.typesafe.config.Config
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import spark.jobserver.SparkJob
import spark.jobserver.SparkJobValid
import spark.jobserver.SparkJobValidation

object FutureJob extends SparkJob{
  override def runJob(sc: SparkContext,
                      jobConfig: Config): Any = {
    val session = SparkSession.builder().config(sc.getConf).getOrCreate()
    import session.implicits._
    val f = Future{
      val seq = Seq(
        Dummy("1", 1),
        Dummy("2", 2),
        Dummy("3", 3),
        Dummy("4", 4),
        Dummy("5", 5)
      )

      val ds = seq.toDS

      ds.collect()
    }

    Await.result(f, 10 seconds)
  }

  case class Dummy(id: String, value: Long)
  override def validate(sc: SparkContext,
                        config: Config): SparkJobValidation = SparkJobValid
}

如果问题仍然存在,稍后我将提供有关使用Spark 2.3.0以及直接通过spark-submit传递jar的信息。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接