map和mapAsync的区别

19
我希望您能为我解释一下 AKKA 流中 map 和 mapAsync 的区别。在文档中提到:

使用 mapAsync 或 mapAsyncUnordered 可以执行涉及外部非流式服务的流转换和副作用

为什么我们不能简单地使用 map 呢?我认为 Flow、Source 和 Sink 都应该是单子性质的,因此 map 应该可以很好地处理这些的延迟性质吧?
1个回答

49

签名

最好通过签名来突出区别: Flow.map 接受一个返回类型为T的函数,而 Flow.mapAsync 接受一个返回类型为Future[T]的函数。

实际示例

例如,假设我们有一个函数,该函数根据用户ID查询数据库以获取用户的全名:

type UserID   = String
type FullName = String

val databaseLookup : UserID => FullName = ???  //implementation unimportant

假设有一个 akka 流的 Source,其中包含 UserID 值,我们可以在流中使用 Flow.map 查询数据库并将全名打印到控制台:

val userIDSource : Source[UserID, _] = ???

val stream = 
  userIDSource.via(Flow[UserID].map(databaseLookup))
              .to(Sink.foreach[FullName](println))
              .run()

这种方法的一个限制是,这个流一次只能进行1个数据库查询。这种串行查询会成为瓶颈,并可能阻止流的最大吞吐量。
我们可以尝试通过使用 "Future" 来进行并发查询以提高性能:
def concurrentDBLookup(userID : UserID) : Future[FullName] = 
  Future { databaseLookup(userID) }

val concurrentStream = 
  userIDSource.via(Flow[UserID].map(concurrentDBLookup))
              .to(Sink.foreach[Future[FullName]](_ foreach println))
              .run()

这个简单的补充说明存在问题,因为我们有效地消除了背压。
Sink只是拉取Future并添加一个“foreach println”,与数据库查询相比相对较快。流将持续传播需求到源并在Flow.map内产生更多的Futures。因此,没有限制同时运行的databaseLookup数量。无限制的并行查询最终可能会超载数据库。
Flow.mapAsync来拯救,我们可以在保持同时访问db的情况下限制同时查找的数量。
val maxLookupCount = 10

val maxLookupConcurrentStream = 
  userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
              .to(Sink.foreach[FullName](println))
              .run()

同时请注意,Sink.foreach 变得更加简单了,它不再需要一个 Future[FullName],而是直接使用 FullName 了。 无序异步 Map 如果维护 UserID 到 FullName 的顺序不必要,那么可以使用Flow.mapAsyncUnordered。例如:你只需要将所有名称打印到控制台中,但并不关心它们打印的顺序。

4
mapAsync 和将异步边界应用于特定阶段是否类似?根据文档,标记异步边界将在一个 actor 中运行每个阶段,只是想知道它们是否相同。 - druuu
使用 "com.typesafe.akka" %% "akka-stream" % "2.6.3",尝试了这个例子,但出现了编译错误 type mismatch; found : akka.stream.scaladsl.Flow[Boolean,Boolean,akka.NotUsed] required: akka.stream.Graph[akka.stream.FlowShape[String,?],?]。如果有人能将此示例更新到最新版本的 Akka Streams,我将不胜感激。 - radumanolescu
@RamonJ,您能否解释一下mapAsync将在哪个线程上执行?它会使用与图形相同的调度程序线程吗(对于此,Akka Streams会创建一个Actor,除非您创建异步边界)? - beinghuman
@beinghuman 鉴于 mapAsync 不接受 ExecutionContext,我认为它使用的是与图形本身相同的调度程序。但是,我还没有阅读代码来确认这一点。 - Ramón J Romero y Vigil
@radumanolescu以上回答的任何部分都没有使用“Boolean”,因此您的错误消息似乎没有使用“此示例”。随时在stackoverflow上提问,我会尝试查看... - Ramón J Romero y Vigil
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接