使用Akka实现的分支和合并

7
问题陈述:我有一组证券需要以并行方式处理。在Java中,我使用线程池来处理每个证券,并使用latch进行倒计时。完成后,我进行一些合并等操作。
因此,我向我的SecurityProcessor(它是一个执行者)发送消息,并等待所有未来完成。最后,我使用MergeHelper进行后处理。SecurityProcessor获取安全性并执行一些I / O和处理,然后回复一个Security。
  val listOfFutures = new ListBuffer[Future[Security]]()
  var portfolioResponse: Portfolio = _
  for (security <- portfolio.getSecurities.toList) {
    val securityProcessor = actorOf[SecurityProcessor].start()
    listOfFutures += (securityProcessor ? security) map {
      _.asInstanceOf[Security]
    }
  }
  val futures = Future.sequence(listOfFutures.toList)
  futures.map {
    listOfSecurities =>
      portfolioResponse = MergeHelper.merge(portfolio, listOfSecurities)
  }.get

这个设计是否正确,使用akka有更好/更酷的实现方式吗?

1个回答

8
val futureResult = Future.sequence(
                  portfolio.getSecurities.toList map { security => (actorOf[SecurityProcessor].start() ? security).mapTo[Security] }
                ) map { securities => MergeHelper.merge(portfolio, securities) }

真的很喜欢这个建议,一切都按预期进行,直到我不得不将其拆分并添加一堆Eventhandler.info语句来调试问题 :( - Debajyoti Roy
def debug[T](t: T): T = { EventHandler.info(t); t } 将此代码翻译为中文。 - Viktor Klang
2
非常高兴你喜欢它,请在Akka邮件列表上分享你的欣喜和/或痛苦之泪! - Viktor Klang

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接