Akka Actors:需要一个例子来理解一些基础知识

15
我正在使用Akka进行尝试,需要一些关于实现我想法的建议。我想要一个Actor,可以发送DownloadFile(URI, File)消息并下载文件。由于可以并行下载,我不想一个接一个地下载文件,而是设置并发下载的限制。
在Akka中,如何建模这样的东西是有意义的?其他需要考虑的事情包括:如果其中一个“工作”Actor出现问题会发生什么?如何重试下载?等等。
我知道这是一个非常复杂的问题,但我希望有人能花时间回答它!谢谢!
2个回答

24

试试这个;它会创建三个下载器,但你可以配置它来创建任意数量的下载器,以便同时处理三个下载请求。

sealed trait DownloaderMessage
case class DownloadFile(uri: URI, file: File) extends DownloaderMessage

object Downloader {
  val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool").build
}

class Downloader extends Actor {
  self.lifeCycle = Permanent
  self.dispatcher = Downloader.dispatcher
  def receive = {
    case DownloadFile(uri, file) =>
      // do the download
  }
}

trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
  val downloaders: List[ActorRef]
  val seq = new CyclicIterator[ActorRef](downloaders)
}

trait DownloadManager extends Actor {
  self.lifeCycle = Permanent
  self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 5000)
  val downloaders: List[ActorRef]
  override def preStart = downloaders foreach { self.startLink(_) }
  override def postStop = self.shutdownLinkedActors()
}

class DownloadService extends DownloadManager with CyclicLoadBalancing {
  val downloaders = List.fill(3)(Actor.actorOf[Downloader])
}

8
创建一个DownloadActor类来管理下载,让所有的DownloadActors共享同一个调度程序(Dispatcher),根据需要配置调度程序(最大线程数、队列大小等),让所有的DownloadActors连接到同一个监督器(Supervisor)上,根据需要配置监督器(可能是OneForOneStrategy),对于每个新的下载创建一个新的DownloadActor或使用一个适当的InfiniteIterator负载均衡器将下载分配给DownloadActors。
如果您使用AsyncHttpClient来下载文件,则它支持断点续传。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接