如何在Akka Flow中处理futures和mapAsync?

3

我建立了一个Akka图形DSL来定义一个简单的流程。但是,流程f4需要3秒钟才能发送一个元素,而f2需要10秒钟。

结果是:3、2、3、2。但是,这不是我想要的。由于f2花费太多时间,我想要得到:3、3、2、2。以下是代码...

implicit val actorSystem = ActorSystem("NumberSystem")
implicit val materializer = ActorMaterializer()

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Source(List(1, 1))
  val out = Sink.foreach(println)

  val bcast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))



  val yourMapper: Int => Future[Int] = (i: Int) => Future(i + 1)
  val yourMapper2: Int => Future[Int] = (i: Int) => Future(i + 2)

  val f1, f3 = Flow[Int]
  val f2= Flow[Int].throttle(1, 10.second, 0, ThrottleMode.Shaping).mapAsync[Int](2)(yourMapper)
  val f4= Flow[Int].throttle(1, 3.second, 0, ThrottleMode.Shaping).mapAsync[Int](2)(yourMapper2)

  in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
  bcast ~> f4 ~> merge
  ClosedShape
})
g.run()

我到底做错了什么?是在使用future或mapAsync时出现了问题吗?还是别的什么问题...谢谢。
3个回答

1

正如你已经发现的那样,替换:

mapAsync(i => Future{i + delta})

使用:

map(_ + delta).async

在这两个流中,您可以实现所需的结果。

不同的结果归结于mapAsyncmap + async之间的关键区别。虽然mapAsync可以在并行线程中执行Futures,但多个mapAsync流阶段仍由相同的底层actor管理,在执行之前会进行操作融合(通常是为了成本效益)。

另一方面,async实际上将异步边界引入了流程中,由单独的actor处理每个流阶段。在您的情况下,这两个流阶段各自向下游发射元素,并且先发出的元素将首先被消耗。不可避免地,管理异步边界的流程会产生一些成本,Akka Stream使用窗口缓冲策略来分摊成本(请参见此Akka Stream文档)。

关于mapAsyncasync的区别,您可以查看这篇博客文章以获得更多细节。


1
抱歉,我在Akka方面还很新,正在学习中。为了获得预期的结果,一种方法是将async放置在代码中:
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Source(List(1, 1))
  val out = Sink.foreach(println)

  val bcast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))



  val yourMapper: Int => Future[Int] = (i: Int) => Future(i + 1)
  val yourMapper2: Int => Future[Int] = (i: Int) => Future(i + 2)

  val f1, f3 = Flow[Int]
  val f2= Flow[Int].throttle(1, 10.second, 0, ThrottleMode.Shaping).map(_+1)
    //.mapAsyncUnordered[Int](2)(yourMapper)
  val f4= Flow[Int].throttle(1, 3.second, 0, ThrottleMode.Shaping).map(_+2)
    //.mapAsync[Int](2)(yourMapper2)

  in ~> f1 ~> bcast ~> f2.async ~> merge ~> f3 ~> out
  bcast ~> f4.async ~> merge
  ClosedShape
})
g.run()

0

所以你正在尝试将f2和f4的结果合并在一起。在这种情况下,您正在尝试执行有时称为“散集模式”的操作。

我认为没有现成的方法来实现它,除非添加一个自定义有状态阶段,该阶段将跟踪来自f2和f4的输出,并在两者都可用时发出记录。但是需要注意一些复杂性:

  • f2/f4如果失败会发生什么
  • 如果它们花费太长时间会发生什么
  • 您需要为每个输入记录设置唯一键,以便您知道来自f2的哪个输出对应于f4(反之亦然)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接