设计一个API给客户端连接第三方服务。

4
我相对于Scala还比较新手,正在开发一个应用程序(库),它是第三方服务的客户端(我无法修改服务器端并且它使用自定义二进制协议)。我使用Netty进行网络通信。
我想设计一个API,使用户可以:
- 向服务器发送请求 - 发送请求到服务器并异步获取响应 - 订阅由服务器触发的事件(具有多个异步事件处理程序,这些处理程序也应该能够发送请求)
我不确定如何设计它。在探索Scala时,我遇到了大量关于Actor模型的信息,但我不确定它是否适用于此,如果适用,应该如何使用。
我希望能得到一些关于我应该采取的方式的建议。

我不知道你的问题是什么,或者除了“使用事件构建 API”之外,你想做什么。你能提供更多的解释吗? - wheaties
@wheaties 好的,我想要“使用事件构建API”,并且正如我在其中一条评论中所说,使用“交互式”对象来表示服务中的实体。我正在寻求设计模式,以帮助我使API用户友好、“scala友好”和灵活。 - izstas
3个回答

4
一般来说,向用户代码公开异步功能的Scala方式是返回一个scala.concurrent.Future[T]
如果您采用Actor方式,可以考虑将二进制通信封装在单个Actor类的上下文中。您可以使用Akka路由支持扩展此代理Actor的实例,并且您可以轻松使用ask模式生成响应Future。 有一些不错的库(例如Spray、Play Framework)可以将RESTful甚至WebSocket层包装在Akka之上,几乎没有任何难度。
Publisher功能的一个好模型可能是定义一个Publisher特质,您可以将其混合到某些Actor子类中。这可以定义一些状态来跟踪订阅者、处理SubscribeUnsubscribe消息,并提供某种方便的方法来广播消息:
  /**
    * Sends a copy of the supplied event object to every subscriber of
    * the event object class and superclasses.
    */
  protected[this] def publish[T](event: T) {
    for (subscriber <- subscribersFor(event)) subscriber ! event
  }

这些只是一些基于最近项目中类似做法的想法。如果您需要更具体的指导,请详细说明您的用例。此外,如果您确实有兴趣在Scala中探索Actor,Akka用户列表是一个很好的资源,可以解答一般性问题。


我还想拥有代表“服务实体”的对象。我可以使用case类,但是我有两个要求:1)其中一些实体可以从服务接收实时更新,并且我希望尽快将它们提供给API用户;2)我希望这些对象具有相互交互/修改的方法。我能否在Actor模型中很好地实现它,或者我正在完全错误的方向上思考? - izstas

2
请看spray-client library。它提供了HTTP请求功能(我假设您要通信的服务器是一个网络服务?)。它为构建请求提供了相当不错的DSL,并且全部都是关于异步操作的。它在幕后确实使用了akka Actor模型,但您不必构建自己的Actor来使用它。相反,您可以只使用Scala的Future模型来异步处理事物。有关Future模型的良好介绍可以在这里找到。
spray-client的基本构建块是“pipeline”,它将HttpRequest映射到包含HttpResponse的Future中。
// this is from the spray-client docs
val pipeline: HttpRequest => Future[HttpResponse] = sendReceive

val response: Future[HttpResponse] = pipeline(Get("http://spray.io/"))

您可以将这个基本构建块拿来,在几个步骤中构建出一个客户端 API。首先,创建一个类来设置管道并定义一些中间帮助程序,演示 响应转换 技术。
import scala.concurrent._
import spray.can.client.HttpClient
import spray.client.HttpConduit
import spray.client.HttpConduit._
import spray.http.{HttpRequest, HttpResponse, FormData}
import spray.httpx.unmarshalling.Unmarshaller
import spray.io.IOExtension

type Pipeline = (HttpRequest) => Future[HttpResponse]

// this is basically spray-client boilerplate     
def createPipeline(system: ActorSystem, host: String, port: Int): Pipeline = {
    val httpClient = system.actorOf(Props(new HttpClient(IOExtension(system).ioBridge())))
    val conduit = system.actorOf(props = Props(new HttpConduit(httpClient, host, port)))

    sendReceive(conduit)
}

private var pipeline: Pipeline = _
// unmarshalls to a specific type, e.g. a case class representing a datamodel
private def unmarshallingPipeline[T](implicit ec:ExecutionContext, um:Unmarshaller[T]) = (pipeline ~> unmarshal[T])
// for requests that don't return any content.  If you get a successful Future it worked; if there's an error you'll get a failed future from the errorFilter below.
private def unitPipeline(implicit ec:ExecutionContext) = (pipeline ~>  { _:HttpResponse => () })
// similar to unitPipeline, but where you care about the specific response code.
private def statusPipeline(implicit ec:ExecutionContext) = (pipeline -> {r:HttpResponse => r.status})

// if you want standard error handling create a filter like this
// RemoteServerError and RemoteClientError are custom exception classes
// not shown here.
val errorFilter = { response:HttpResponse =>
  if(response.status.isSuccess) response
  else if(response.status.value >= 500) throw RemoteServerError(response)
  else throw RemoteClientError(response)
}

pipeline = (createPipeline(system, "yourHost", 8080) ~> errorFilter)

然后,您可以将这些方法封装到与特定请求/响应相关联的方法中,从而成为公共API。例如,假设服务具有“ping” GET端点,返回一个字符串(“pong”),以及一个“form” POST端点,您在其中发布表单数据并收到一个DataModel作为回应:
def ping()(implicit ec:ExecutionContext, um:Unmarshaller[String]): Future[String] =
    unmarshallingPipeline(Get("/ping"))

def form(formData: Map[String, String])(implicit ec:ExecutionContext, um:Unmarshaller[DataModel]): Future[DataModel] = 
    unmarshallingPipeline(Post("/form"), FormData(formData)) 

然后有人可以像这样使用API:
import scala.util.{Failure, Success}

API.ping() foreach(println)  // will print out "pong" when response comes back

API.form(Map("a" -> "b") onComplete {
    case Success(dataModel) => println("Form accepted. Server returned DataModel: " + dataModel)
    case Failure(e) => println("Oh noes, the form didn't go through! " + e)
}

我不确定您是否能在spray-client中直接获得有关订阅事件的第三个要点的支持。这些事件是否由服务器生成并以某种方式发送到客户端,超出了特定HTTP请求的范围?如果是这样,那么spray-client可能无法直接帮助您(尽管您的事件处理程序仍然可以使用它来发送请求)。事件是否发生在客户端,例如最初由服务器响应触发的延迟处理完成?如果是这样,您实际上可能只需要使用Future中的功能就可以取得很大进展,但根据您的用例,使用Actors可能是有意义的。

我感谢您详细的回答,但是正如我在问题中所说,服务器端使用自定义二进制协议。 - izstas
哎呀,我错过了。显然,你需要自己编写处理协议的代码。你可以在Akka的网络实现基础上构建,参见这里。我自己还没有深入研究过它,无法提供更详细的建议,抱歉。 - ryryguy

2

可观察对象

这个例子看起来像是“可观察对象”模式的一个很好的实现。该模式源自于 .NET响应式扩展,但也适用于 JavaScala。该库由Netflix提供,具有非常好的质量。

这个模式有一个良好的理论基础---从范畴论意义上来说,它是迭代器的对偶。但更重要的是,它有很多实用的想法。特别是在处理时间方面非常出色,例如你可以限制希望获取的事件速率。

通过可观察对象,您可以在非常高的层次上处理事件。在 .NET 中,它看起来很像 SQL 查询。您可以注册某些事件(“FROM”),筛选它们(“WHERE”),最后进行处理(“SELECT”)。在 Scala 中,您可以使用标准的单调 API (map、filter、flatMap)和当然还有“for 表达式”。

一个示例可能如下:

stackoverflowQuestions.filter(_.tag == "Scala").map(_.subject).throttleLast(1 second).subscribe(println _)

观察者模式可以解决许多与事件相关的问题:

  • 处理订阅
  • 处理错误
  • 过滤和预处理事件
  • 缓冲事件

API的结构

您的API应为每个事件源提供一个观察者。对于过程调用,您提供一个函数,将该函数调用映射到观察者。此函数将调用远程过程并通过观察者提供结果。

实现细节

将以下依赖项添加到build.sbt中:

libraryDependencies +=     "com.netflix.rxjava" % "rxjava-scala" % "0.15.0"

今日免费次数已满, 请开通会员/明日再来
private val callbackFunc : (rx.lang.scala.Observer[String]) => rx.lang.scala.Subscription = { o =>
  val listener = {
    case Value(s) => o.onNext(s)
    case Error(e) => o.onError(o)
  }

  remote.subscribe(listener)

  // Return an interface to cancel the subscription
  new Subscription {
    val unsubscribed = new AtomicBoolean(false)
    def isUnsubscribed: Boolean = unsubscribed.get()

    val asJavaSubscription: rx.Subscription = new rx.Subscription {
      def unsubscribe() {
        remote.unsubscribe(listener)
        unsubscribed.set(true)
      }
    }
  }

如果您有具体问题,只需提问,我可以完善答案。

其他资源

在coursera上,Martin Odersky等人提供了一个非常好的课程,涵盖了Observables和其他反应技术。


我已经阅读了一些文章,观察者似乎是处理事件的非常好的方式。如果永远不会有多个结果对象,使用可观察对象返回过程调用的结果是否比使用常规Scala futures有任何优势? - izstas
主要优势在于当API的其余部分也由可观察对象组成时。然后,您可以轻松地组合多个(可观察)事件流(例如使用++、zip或flatMap),然后使用生成的流进行操作。 - stefan.schwetschke

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接