从线程模型转向Actor模型

7
尝试从Actor的角度而不是线程的角度思考如何处理问题。以下用例让我有点困惑:
考虑一个系统,它有一个生产者进程创建工作(例如通过从文件中读取数据),还有一些工作者进程消耗这些工作(例如解析数据并将其写入数据库)。工作的生产和消耗速率可能会变化,系统应该保持稳健。例如,如果工作者无法跟上,则生产者应检测到此情况并最终减慢或等待。
使用线程实现这很容易:
val producer:Iterator[Work] = createProducer()
val queue = new LinkedBlockingQueue[Work](QUEUE_SIZE)
val workers = (0 until NUM_WORKERS) map { i =>
  new Thread() { 
    override def run() = {
      while (true) {
        try {
          // take next unit of work, waiting if necessary
          val work = queue.take()
          process(work)
        }
        catch {
          case e:InterruptedException => return
        }
      }
    }
  }
}

// start the workers
workers.foreach(_.start())

while (producer.hasNext) {
  val work = producer.next()
  // add new unit of work, waiting if necessary
  queue.put(work)
}

while (!queue.isEmpty) {
  // wait until queue is drained
  queue.wait()
}

// stop the workers
workers.foreach(_.interrupt())

这个模型并没有什么问题,我之前也成功地使用过它。但是这个例子可能有点啰嗦了,使用Executor或CompletionService会更适合这个任务。不过我喜欢Actor的抽象概念,并且认为在许多情况下更容易理解。有没有一种方法可以使用Actor重写这个例子,特别是确保没有缓冲区溢出(例如满邮箱、丢失消息等)?

1个回答

3
由于演员处理消息时是“离线”的(即消息的消耗与其接收无关),因此很难看出如何精确地模拟“生产者等待消费者赶上”的情况。
我能想到的唯一办法是,消费者从制作人演员(使用reply)请求工作:
case object MoreWorkPlease
class Consumer(prod : Producer) extends Actor {
  def act = {
    prod ! MoreWorkPlease
    loop {
      react {
        case Work(payload) => doStuff(payload); reply(MoreWorkPlease)
      }
    }
  }
}

class Producer extends Actor {
  def act = loop {
    react {
      case MoreWorkPlease => reply(Work(getNextItem))
    }
  }
}

当然,这并不完美,因为生产者不会“预读”,只有在消费者准备好时才开始工作。使用方法大致如下:

val prod = new Producer
(1 to NUM_ACTORS).map(new Consumer(prod)).foreach(_.start())
prod.start()

嗯,那是我曾经想过的一个解决方案。这可能足够了,但我的担忧是如果工作者超过生产者,则缺乏工作缓冲会导致性能下降。 - toluju
@toluju - 开始时让每个消费者请求工作,并让生产者 响应这些消息,而是接收它们,如果还没有更多的工作要做,则将它们放入队列中。 (然后,一旦有工作,就可以将其分配给队列中的项目。) - Rex Kerr

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接