我开始使用Spark Streaming来处理实时数据源。我的情况是我有一个Akka actor接收器使用 "with ActorHelper",然后我有我的Spark作业执行一些映射和转换,然后我想将结果发送给另一个actor。
我的问题在于最后一部分。当尝试向另一个actor发送消息时,Spark会引发异常:
15/02/20 16:43:16 WARN TaskSetManager:在第2.0阶段(TID 2,localhost)中失去任务0.0:java.lang.IllegalStateException:尝试反序列化一个未在作用域内的已序列化ActorRef。 使用'akka.serialization.Serialization.currentSystem.withValue(system) {...}'
创建这个最后一个Actor的方式如下:
我的问题在于最后一部分。当尝试向另一个actor发送消息时,Spark会引发异常:
15/02/20 16:43:16 WARN TaskSetManager:在第2.0阶段(TID 2,localhost)中失去任务0.0:java.lang.IllegalStateException:尝试反序列化一个未在作用域内的已序列化ActorRef。 使用'akka.serialization.Serialization.currentSystem.withValue(system) {...}'
创建这个最后一个Actor的方式如下:
val actorSystem = SparkEnv.get.actorSystem
val lastActor = actorSystem.actorOf(MyLastActor.props(someParam), "MyLastActor")
然后像这样使用它:
result.foreachRDD(rdd => rdd.foreachPartition(lastActor ! _))
我不确定如何执行建议中的“使用'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'”。我是否需要通过配置设置特殊内容?或者需要以不同方式创建我的actor?
collect()
(如文档中所述)时有足够小的数据集。 - Peter Klipfel