无法将ActorRef反序列化以将结果发送到不同的Actor

4
我开始使用Spark Streaming来处理实时数据源。我的情况是我有一个Akka actor接收器使用 "with ActorHelper",然后我有我的Spark作业执行一些映射和转换,然后我想将结果发送给另一个actor。
我的问题在于最后一部分。当尝试向另一个actor发送消息时,Spark会引发异常:
15/02/20 16:43:16 WARN TaskSetManager:在第2.0阶段(TID 2,localhost)中失去任务0.0:java.lang.IllegalStateException:尝试反序列化一个未在作用域内的已序列化ActorRef。 使用'akka.serialization.Serialization.currentSystem.withValue(system) {...}'
创建这个最后一个Actor的方式如下:
val actorSystem = SparkEnv.get.actorSystem
val lastActor = actorSystem.actorOf(MyLastActor.props(someParam), "MyLastActor")

然后像这样使用它:

result.foreachRDD(rdd => rdd.foreachPartition(lastActor ! _))

我不确定如何执行建议中的“使用'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'”。我是否需要通过配置设置特殊内容?或者需要以不同方式创建我的actor?

2个回答

1

以下是访问 Spark 域之外的 Actor 的示例。

/* * 以下是使用 actorStream 插入自定义 Actor 作为接收器的示例 * * 重要提示: * 由于 Actor 可能存在于 Spark 框架之外,因此用户有责任确保类型安全性,即接收到的数据类型和 InputDstream 类型应相同。 * * 例如:actorStream 和 SampleActorReceiver 都被参数化为相同的类型以确保类型安全。 */

val lines = ssc.actorStream[String](
  Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
    host, port.toInt))), "SampleReceiver")

1
我发现如果在发送给演员之前进行收集,它会非常有效。
result.foreachRDD(rdd =>  rdd.collect().foreach(producer ! _))

2
使用此解决方案时请小心。确保在使用collect()(如文档中所述)时有足够小的数据集。 - Peter Klipfel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接