Akka Stream中map和mapAsync(1)的结果

3

我用了 mapAsync(1) 的代码没有达到我想要的效果。但是当我使用 Await.resultmapAsync(1) 改成了 map,它就能够正常工作。所以我有一个问题。

(A)使用 map(B)使用 mapAsync(1) 会在任何时候产生相同的结果吗?
// (A) Use map
someSource
 .map{r => 
   val future = makeFuture(r) // returns the same future if r is the same
   Await.result(future, Duration.Inf)
 }

// (B) Use mapAsync(1)
someSource
 .mapAsync(1){r =>
   val future = makeFuture(r) // returns the same future if r is the same
   future
}

实际上,我想粘贴我的真实代码,但它太长了,而且有一些依赖于我原始阶段的内容。


mapAsync 本身返回一个 Future[T],我认为没有必要再用 makeFuture 创建另一个 future 来包装它。 - Yuval Itzchakov
非常感谢您的回复。很抱歉,我无法理解您所说的内容。我认为 Source[O, Mat]#mapAsync[T](par: Int)(f: O => Future[T]) 返回的是 Source[T, Mat],而不是 Future[T] - ryo
makeFuture 的代码长什么样,r 的类型是什么? - cmbaxter
抱歉我的代码有些粗糙。在我的真实代码中,r 的类型是 akka.util.ByteString,但我认为它的类型可以是任何东西。 - ryo
感谢您和其他回答者的帮助,我的代码成功实现了我想要的功能。在制作复制代码的过程中,我发现了自己的错误。这些错误出现在我的原始GraphStage中。非常感谢! - ryo
2个回答

3
虽然在类型上两个流的语义最终相同 (Source [Int,NotUsed] ),但例子(A)展示的风格非常不好 - 请不要在流内部阻塞(Await)。

对于这种情况,mapAsync恰好是使用案例。您的操作返回一个Future[T],并且希望在未来完成后将该值向下推到流中。请注意,没有阻塞在mapAsync中,它会安排回调在内部推送未来的值,并在完成时执行此操作。
回答您关于"它们是否做同样的事情?"的问题,从技术上讲,是的,但第一个会在运行线程池时引起性能问题,请避免使用map +阻塞,而是使用mapAsync。

非常感谢您的回复。我想使用mapAsync,但在我的特定代码中,mapAsync不起作用。因此,我用(A)的方式将mapAsync替换为mapAwait。然后我的代码就可以工作了。但是我不明白为什么替换会影响结果。为了知道原因,我提出了问题。我想粘贴这段代码,但它太长了。因此,我将尝试用简短的代码模拟这种现象。 - ryo
我认为你的代码中一定还有其他问题,mapAsync 可能不是问题所在。请尝试找到一个可重现的例子。 - Konrad 'ktoso' Malawski
是的,你说得对!mapAsync并没有问题。感谢你和其他回答者,我的代码实现了我想要的功能。在制作复制代码的过程中,我发现了自己的错误。这个错误出现在我原来的GraphStage中。非常感谢! - ryo

1
这些调用在语义上非常相似,尽管使用Await进行阻塞可能不是一个好主意。当然,这两个调用的类型签名相同(Source[Int, NotUsed]),在许多情况下,这些调用将产生相同的结果(除了阻塞)。例如,以下内容包括定时future和故障的非默认监督策略,对于map内部带有Await和mapAsync,将产生相同的结果:
import akka.actor._
import akka.stream.ActorAttributes.supervisionStrategy
import akka.stream.Supervision.resumingDecider
import akka.stream._
import akka.stream.scaladsl._

import scala.concurrent._
import scala.concurrent.duration._
import scala.language.postfixOps

object Main {

  def main(args: Array[String]) {

    implicit val system = ActorSystem("TestSystem")
    implicit val materializer = ActorMaterializer()
    import scala.concurrent.ExecutionContext.Implicits.global
    import system.scheduler

    def makeFuture(r: Int) = {
      akka.pattern.after(2 seconds, scheduler) {
        if (r % 3 == 0)
          Future.failed(new Exception(s"Failure for input $r"))
        else
          Future(r + 100)
      }
    }

    val someSource = Source(1 to 20)

    val mapped = someSource
      .map { r =>
        val future = makeFuture(r)
        Await.result(future, Duration.Inf)
      }.withAttributes(supervisionStrategy(resumingDecider))

    val mappedAsync = someSource
      .mapAsyncUnordered(1) { r =>
        val future = makeFuture(r)
        future
      }.withAttributes(supervisionStrategy(resumingDecider))

    mapped runForeach println
    mappedAsync runForeach println

  }

}

你的上游代码可能在某种程度上依赖于 map 调用中的阻塞行为。你能否提供一个简明扼要的问题复现?


谢谢您的回复。 您能否提供一个简明扼要的问题重现代码? 实际上,在询问之前,我尝试过制作一个简单的问题重现代码,但是我失败了。但我会再试一次。 - ryo
感谢您和其他回答者的帮助,我的代码成功实现了我想要的功能。在制作复制代码的过程中,我发现了自己的错误。这些错误出现在我的原始“GraphStage”中。非常感谢! - ryo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接