尝试从Actor的角度而不是线程的角度思考如何处理问题。以下用例让我有点困惑:
考虑一个系统,它有一个生产者进程创建工作(例如通过从文件中读取数据),还有一些工作者进程消耗这些工作(例如解析数据并将其写入数据库)。工作的生产和消耗速率可能会变化,系统应该保持稳健。例如,如果工作者无法跟上,则生产者应检测到此情况并最终减慢或等待。
使用线程实现这很容易:
考虑一个系统,它有一个生产者进程创建工作(例如通过从文件中读取数据),还有一些工作者进程消耗这些工作(例如解析数据并将其写入数据库)。工作的生产和消耗速率可能会变化,系统应该保持稳健。例如,如果工作者无法跟上,则生产者应检测到此情况并最终减慢或等待。
使用线程实现这很容易:
val producer:Iterator[Work] = createProducer()
val queue = new LinkedBlockingQueue[Work](QUEUE_SIZE)
val workers = (0 until NUM_WORKERS) map { i =>
new Thread() {
override def run() = {
while (true) {
try {
// take next unit of work, waiting if necessary
val work = queue.take()
process(work)
}
catch {
case e:InterruptedException => return
}
}
}
}
}
// start the workers
workers.foreach(_.start())
while (producer.hasNext) {
val work = producer.next()
// add new unit of work, waiting if necessary
queue.put(work)
}
while (!queue.isEmpty) {
// wait until queue is drained
queue.wait()
}
// stop the workers
workers.foreach(_.interrupt())
这个模型并没有什么问题,我之前也成功地使用过它。但是这个例子可能有点啰嗦了,使用Executor或CompletionService会更适合这个任务。不过我喜欢Actor的抽象概念,并且认为在许多情况下更容易理解。有没有一种方法可以使用Actor重写这个例子,特别是确保没有缓冲区溢出(例如满邮箱、丢失消息等)?