Scala actors能同时处理多个消息吗?

28

最近我提出的问题的回复表明一个actor一次只能处理一条消息。这是真的吗?在《Scala编程》中没有明确说明,该书包含以下片段(第593页)

如果[react方法]发现可以处理的消息,[它]将安排稍后执行该消息的处理并抛出异常。

(强调是我的)。有两个相关的(互斥的)问题:

  1. 假设一个actor可以同时处理多个消息,如何强制actor一次只能处理一条消息(使用receive)?
  2. 假设一个actor一次只能处理一条消息,如何最好地实现一个actor,该actor实际上可以并发处理消息

编辑:进行一些测试似乎证明我错了,actors确实是顺序的。因此需要回答问题#2


6
我真的不明白这个问题怎么能被扣分。如果我在《Scala编程》的一个章节中读了整整一章关于演员模型,但仍然不确定答案,那这就是一个完全合理的问题。 - oxbow_lakes
一个单一的Actor按顺序处理消息,因为这个想法是可以使用Actor来调解对(本来)共享资源的访问,并且不需要在共享资源上进行其他同步。如果消息可以按顺序处理,那么你需要在你的Actor内部使用锁来确保同步错误不会发生。 - Ken Bloom
4个回答

26

演员一次处理一个消息。处理多个消息的经典模式是拥有一个协调员演员来负责消费者演员池。如果您使用 React,则消费者池可能很大,但仍然只使用少量 JVM 线程。以下是一个示例,在其中创建了一个由 10 个消费者和一个协调员组成的池。

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

这段代码测试可用的消费者并向该消费者发送请求,可选的方法是随机分配给消费者或使用循环调度器。

根据您的需求,Scala的Futures可能更适合您。例如,如果您不需要使用actor,则所有上述机制都可以编写为

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))

你能解释一下 Ready 是什么意思吗? - Seun Osewa
在这个例子中,准备好的是一个作为消息传递的案例类。协调器将一个Ready消息发送给消费者,基本意思是“你准备好了吗?”消费者则回复一个ready消息,表示“是的”。第一个回复的消费者随后会得到请求(Request)。 - harmanjd

6

我认为答案是Actor无法异步处理消息。如果您有一个Actor需要监听可以异步处理的消息,那么可以这样编写:

val actor_ = actor {

  loop {
    react {
      case msg =>
        //create a new actor to execute the work. The framework can then 
        //manage the resources effectively
        actor {
          //do work here
        }
      }
    }
  }

0
你可以尝试使用Futures概念。请使用Futures存储所有这些消息,然后尝试处理它们。

0
如果你想要做多个事情,那么你应该使用多个Actor。使用Actor的主要原因是将工作分配给多个独立的进程。

这是一个没有帮助的答案。假设我有一些演员,它们读取要处理的事件并将它们发送到其他演员进行处理。实际上,即使实际处理代码完全相同(因此我会在一个演员中编写它),也可以同时处理多个事件。如何让演员框架“复制”(或创建池)这些(相同的)演员,以便我可以最好地利用我的编程资源? - oxbow_lakes
我指的是“最佳利用我的处理器资源”。 - oxbow_lakes
1
我的回答没有帮助是因为你的问题没有得到恰当的阐述。一个演员是一个进程而不是一段代码。就像一个对象是一个类的实例一样。你没有问如何创建多个演员来处理多个消息。你想知道如何让一个演员同时处理多个消息。我的回答是为了澄清我认为你对演员模型的误解。 - mamboking
1
你没有尝试澄清任何事情。如果你发布了你评论的内容,你会得到一个赞成票而不是反对票。 - oxbow_lakes

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接