Scala中的并发Akka代理

4

我目前正在进行一个Scala项目,我决定使用Akka的代理库来代替Actor模型,因为它允许一种更加函数化的并发处理方式。然而,我在同时运行多个不同的代理时遇到了问题。似乎我最多只能同时运行三到四个代理。

import akka.actor._
import akka.agent._
import scala.concurrent.ExecutionContext.Implicits.global

object AgentTester extends App {
// Create the system for the actors that power the agents
implicit val system = ActorSystem("ActorSystem")

// Create an agent for each int between 1 and 10
val agents = Vector.tabulate[Agent[Int]](10)(x=>Agent[Int](1+x))

// Define a function for each agent to execute
def printRecur(a: Agent[Int])(x: Int): Int = {
    // Print out the stored number and sleep.
    println(x)
    Thread.sleep(250)

    // Recur the agent
    a sendOff printRecur(a) _

    // Keep the agent's value the same
    x
}

// Start each agent
for(a <- agents) {
    Thread.sleep(10)
    a sendOff printRecur(a) _
}
}

上面的代码创建了一个代理,包含1到10之间的每个整数。底部的循环将printRecur函数发送到每个代理。程序的输出应该显示1到10的数字每隔四分之一秒打印一次(尽管不按顺序)。然而,由于某些原因,我的输出仅显示输出1到4。
在Akka中是否有更加规范的使用代理的方式可以解决这个问题?我来自Clojure背景,在那里成功地使用过这种模式,所以我天真地在Scala中使用了相同的模式。
1个回答

5
我的猜测是你正在运行一个4核的系统,这也是为什么你只看到数字1-4的部分原因。在这里起作用的主要因素是你正在使用默认执行上下文,我猜你的系统使用了一个线程池,其中只有4个线程(每个核心一个)。通过你以这种递归方式编码的方式,我的猜测是前4个代理永远不会放弃线程,他们是唯一会打印任何东西的人。你可以很容易地通过删除这一行来解决这个问题:
import scala.concurrent.ExecutionContext.Implicits.global

在创建ActorSystem后,添加此行代码。
import system.dispatcher

这将使用默认的Actor系统调度程序,它是一个fork-join调度程序,似乎没有您在样例中导入的默认执行上下文所遇到的问题。

您也可以考虑使用send而不是sendOff,因为它将使用构建代理时可用的执行上下文。我认为当您有一个明确想要使用另一个执行上下文的情况时,才会使用sendOff


使用send而不是sendOff并没有什么区别,但更换导入却有所改变。谢谢! - DrPepper

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接