Akka用于REST轮询

12

我正在尝试将一个大型的Scala + Akka + PlayMini应用程序与外部REST API进行接口连接。思路是定期轮询(基本上每1到10分钟)根URL,然后浏览子级URL以提取数据,然后发送到消息队列。

我想出了两种方法来实现这个:

第一种方法

创建一个Actor层次结构来匹配API的资源路径结构。在Google Latitude的情况下,例如:

在这种情况下,每个Actor负责定期轮询其关联的资源,并为下一级路径资源创建/删除子Actor(即,actor 'latitude/v1/location'通过轮询https://www.googleapis.com/latitude/v1/location学习到所有位置后,创建1、2、3等Actor)。

第二种方法

创建一个池,其中包含相同的轮询Actor,这些Actor接收包含资源路径的轮询请求(由路由器负责负载均衡),对URL进行一次轮询,执行一些处理,并安排轮询请求(用于下一级资源和已轮询的URL)。在Google Latitude中,例如:

1个路由器,n个轮询者Actor。针对https://www.googleapis.com/latitude/v1/location的初始轮询请求会导致多个新的(即时)轮询请求,如https://www.googleapis.com/latitude/v1/location/1https://www.googleapis.com/latitude/v1/location/2等,以及一个(延迟的)轮询请求,即https://www.googleapis.com/latitude/v1/location

我已经实现了这两种解决方案,暂时无法观察到任何重要的性能差异,至少对于我感兴趣的API和轮询频率来说是如此。我发现第一种方法在某种程度上更容易理解,可能比第二种方法更容易使用system.scheduler.schedule(...),而第二种方法(我需要使用scheduleOnce(...))则更易于编写。此外,假定资源通过几个级别进行嵌套并且生命周期较短(例如,在每次轮询之间可能会添加/删除多个资源),Akka的生命周期管理使得在第一种情况下轻松地消除整个分支。第二种方法应该(理论上)更快,代码写起来也相对容易。

我的问题是:

  1. 哪种方法似乎是最好的(就性能、可扩展性和代码复杂性而言)?
  2. 您是否认为任何一种方法的设计存在问题(尤其是第一种方法)?
  3. 有人试图实现过类似的功能吗?如何实现的?

谢谢!


我更担心错误路径。如果特定中间路径出现瞬态故障会发生什么?你会重试吗?任何一个子actor的失败是否也会导致父actor的失败?在足够大的规模下,几乎可以保证会有一些子actor的瞬态故障。 - Ross Judson
1个回答

1
为什么不创建一个主轮询器,然后按计划启动异步资源请求呢?
我不是Akka的专家,但我尝试了一下:
轮询器对象遍历要获取的资源列表:
import akka.util.duration._
import akka.actor._
import play.api.Play.current
import play.api.libs.concurrent.Akka

object Poller {
  val poller = Akka.system.actorOf(Props(new Actor {
    def receive = {
      case x: String => Akka.system.actorOf(Props[ActingSpider], name=x.filter(_.isLetterOrDigit)) ! x
    }
  }))

  def start(l: List[String]): List[Cancellable] =
    l.map(Akka.system.scheduler.schedule(3 seconds, 3 seconds, poller, _))

  def stop(c: Cancellable) {c.cancel()}
}

读取资源异步并触发更多的异步读取的执行器。如果情况允许,您可以将消息调度安排在计划表上而不是立即调用:

import akka.actor.{Props, Actor}
import java.io.File

class ActingSpider extends Actor {
  import context._
  def receive = {
    case name: String => {
      println("reading " + name)
      new File(name) match {
        case f if f.exists() => spider(f)
        case _ => println("File not found")
      }
      context.stop(self)
    }
  }

  def spider(file: File) {
    io.Source.fromFile(file).getLines().foreach(l => {
      val k = actorOf(Props[ActingSpider], name=l.filter(_.isLetterOrDigit))
      k ! l
    })
  }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接