演员邮箱溢出。Scala

4

我目前正在使用Scala的两个角色进行工作。其中一个是生产者,它生成一些数据并将其发送给解析器。生产者通过消息发送HashMap[String,HashMap[Object,List[Int]]](以及this来标记发送者):

parcer ! (this,data)

解析器始终在等待以下类似信息:
def act(){
    loop{
      react{
        case (producer, data)=> parse(data);
      }
    }
}

在正常情况下,程序运行得非常完美。但是,当处理大量数据和发送许多消息时(哈希大约有10^4个元素,内部哈希约有100个元素,列表长度为100),程序会崩溃。它没有显示任何错误或异常,只是停止了。
问题似乎是生产者的工作速度比解析器快得多(而且目前我不想使用超过一个解析器)。
在阅读了 scala mailbox size limit 之后,我想知道我的解析器的邮箱是否已经达到其限制。该帖子还提供了一些解决方案,但我首先需要确保这是问题所在。如何测试这个问题?
有没有一种方法可以知道演员的内存限制?那么读取邮箱中已使用/未使用的内存呢?
如果在 该链接 中没有发布的工作流程建议也是欢迎的。
谢谢,

1
如果你想看不同的实现,可以看一下Akka1,它具有负载均衡2和工作窃取3 - Viktor Klang
1个回答

4
首先,你不需要显式地传递发送方,因为Scala actors框架会自动跟踪发送方。你可以使用方法sender来访问消息的发送方。
如此实现: scala.actors.MQueue,一个actor的邮箱是以链表的形式实现的,因此只受堆大小的限制。
但是,如果你担心生产者非常快,而消费者非常慢,我建议你探索一种调节机制。但是我不建议采用接受答案中提出的方法,详见问题scala mailbox size limit
通常在系统负载过重时尝试发送过载消息似乎不是一个好主意。如果你的系统太忙而无法检查过载怎么办?如果过载消息的接收方太忙而无法处理它怎么办?此外,丢弃消息听起来对我来说并不是一个很好的主意。我认为你希望所有的工作项都能得到可靠的处理。
此外,我不会依赖于mailboxSize来确定负载。你不能区分不同的消息类型,并且只能从消费者本身中检查,而不是从生产者中。
我建议使用一种方法,在消费者知道自己可以处理时请求更多的工作。
下面是一个简单的示例,说明如何实现这一点。
import scala.actors._
import Actor._

object ConsumerProducer {
  def main(args: Array[String]) {
    val producer = new Producer(Iterator.range(0, 10000))
    val consumer = new Consumer(producer)
  }
}

case class Produce(count: Int)
case object Finished

class Producer[T](source: Iterator[T]) extends Actor {

  start

  def act() {
    loopWhile(source.hasNext) {
      react {
        case Produce(n: Int) => produce(n)
      } 
    }
  }

  def produce(n: Int) {
    println("producing " + n)
    var remaining = n
    source takeWhile(_ => remaining > 0) foreach { x => sender ! x; remaining -= 1 }
    if(!source.hasNext) sender ! Finished
  }
}

class Consumer(producer: Actor) extends Actor {

  start

  private var remaining = 0

  def act() {
    requestWork()
    consume()
  }

  def consume(): Nothing = react {
    case Finished => println("Finished")
    case n: Int => work(n); requestWork(); consume()
  }

  def requestWork() = if(remaining < 5) { remaining += 10; producer ! Produce(10) }

  def work(n: Int) = {
    println(n + ": " + (0 until 10000).foldLeft(0) { (acc, x) => acc + x * n })
    remaining -= 1
  }
}

嗨,Ruediger, 谢谢你的回答。所以邮件箱并不像我想象的那样实现! 顺便问一下,你如何访问消息的发送者? - Skuge
在你的actor内部,有一个sender方法。它总是返回最后一条接收到的消息的发送者。我在我给出的示例中在Producer actor的produce方法中使用了它。 - Ruediger Keller

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接