什么是在Scala流中结合akka-http flow的最佳方式?

3

我有一个使用案例,在经过n个Akka-stream流之后,我必须获取其中一个的结果并向HTTP REST API发出请求。

在HTTP请求之前,最后一个akka-stream流类型是字符串:

val stream1:Flow[T,String,NotUsed] = Flow[T].map(_.toString)

现在,应该指定HTTP请求,我考虑了以下内容:
val stream2: Flow[String,Future[HttpRespone],NotUsed] = Flow[String].map(param => Http.singleRequest(HttpRequest(uri=s"host.com/$param")))

然后将它们组合起来:

val stream3 = stream1 via stream2

这是最好的方法吗?你们会推荐哪些方法,为什么?在这个使用案例范围内,一些最佳实践示例将非常有帮助!

提前感谢 :)


你真的需要一个 Flow[String, Future[HttpResponse, NotUsed]] 而不是一个 Flow[String, HttpResponse, NotUsed] 吗? - Rüdiger Klaehn
嘿,Ruediger,Future[String] 实际上是由 Http.singleRequest(...) 返回的内容,不过这并不是问题所在。 - λ Allquantor λ
1个回答

3
您的实现会为每个新参数创建一个到“host.com”的新连接。这是不必要的,会阻止Akka进行某些优化。在幕后,Akka实际上会维护一个连接池以重用打开的连接,但我认为最好在代码中指定您的意图,而不是依赖底层实现。
您可以按照文档中描述的方式建立单个连接:documentation
val connectionFlow: Flow[HttpRequest, HttpResponse, _] = 
  Http().outgoingConnection("host.com")

为了使用此连接流,您需要将字符串路径转换为HttpRequest对象:
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.Path

def pathToRequest(path : String) = HttpRequest(uri=Uri.Empty.withPath(Path(path)))

val reqFlow = Flow[String] map pathToRequest

最后,将所有流程粘合在一起:

val stream3 = stream1 via reqFlow via connectionFlow

这是不断使用不同请求对象查询同一服务器的最常见模式。


嗨,Ramon!感谢您抽出时间回答我的问题。根据 Http.singleRequest(...) 的文档,它会:*在请求的 (cached) 主机连接池中触发单个 [[HttpRequest]]... 因此,在幕后使用了一个缓存的连接池。 - λ Allquantor λ
我同意我的回答中提到了底层缓存。但是,为什么要依赖于Akka缓存的设置/逻辑呢?当你可以自己建立连接并重复使用同一个连接时,为什么不这样做呢? - Ramón J Romero y Vigil
根据 akka-documentation:请注意,直到返回的流实际上被实现之前,不会尝试任何连接!如果流被多次实现,则将打开几个独立的连接(每个实现一个)。如果连接尝试失败,无论出于什么原因,实现的流都将立即终止,并引发相应的异常。这意味着,您的解决方案将为 N 个实现的流产生 N 个连接。 - λ Allquantor λ
正确。但为什么要实现N个流?你只需要实现一个流并使用不同的HttpRequest对象反复使用即可... - Ramón J Romero y Vigil
你如何处理 Http() 需要构建一个 ActorSystem 的事实呢?(我猜测当构建 Http 时,它会自动在内部管理物化)。如果我只是试图将 Http 包含在我的更大的流中,并稍后物化整个流,这并没有太多意义。 - Krotton

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接