从演员中进行的异步调用速率限制

4
我们有一个相当高吞吐量的Actor系统,通过http对外部系统进行异步调用。我们发现由于我们接收到的调用数量,下游系统正在被压垮。
使用“管道到”模式在此处描述的向下流系统的调用:https://petabridge.com/blog/akkadotnet-async-actors-using-pipeto/ 之所以会向下游系统发出这么多调用,是因为Actor在处理完消息后(异步调用启动时),不等待来自异步调用的响应即处理其邮箱中的下一条消息。显然,这是按设计而来的,但在这些情况下,它导致向外部服务发出非常多的异步调用。
我们需要一种限制调用的方法。我能想到几种可能的解决方案来解决这个问题。
  1. 通过等待任务完成,同步执行对外部服务的调用。为演员设置一个池路由器,基本上是一种限制对该外部服务进行调用的方式。

  2. 使用ReceiveAsync方法而不是Receive。这基本上与选项1完全相同。在我上面发布的petabride页面上,它关于此方法的说明是 - “只是别这样做” :)

  3. 在进行异步调用之前,开始存储任何传入的消息,然后在异步任务完成后将其取消存储。显然,使用此方法吞吐量要受到更大的限制。

我想知道是否有人在使用akka时遇到类似的问题并能够解决?

编辑:

最终我们只使用了选项1才解决了问题。即使用具有特定IO调用等待的Receive()的池路由器(对外部系统的api调用)。这似乎运行得很好,我们可以通过设置池大小来控制“限流”。

我们尝试了选项2(ReceiveAsync),但发现系统在某个时刻会变得不响应而没有抛出任何错误。我们怀疑它陷入了死锁状态。这可能是由于异步关键字的工作方式与仅使用.Wait()或.Result等待任务的方式不同。我现在可以看到Petabridge为什么建议不要使用ReceiveAsync :)
我们没有尝试选项3,因为这将意味着更重大的更改。
2个回答

2
对于我来说,我通过创建一个带路由器作为工作者的子actor来解决这个问题,这些工作者只能处理一条消息。因此,您可以使用多个工作者配置外部系统的负载。此外,这可以让您能够使用一致性哈希来避免对某些消息进行并行处理。
至于工作者 - 在一个项目中,我使用了第一种方法,但使用固定调度程序为工作者 - 所以它们总是有相同的线程来处理消息,并且不影响其他系统部分。如果您有一个相当恒定的负载,这很好。

谢谢回复,听起来我们的方法与您的相似,这很好...一旦我们确定了适合我们的方案,稍后会在这里发布答案。 - lachy

2
实际上,第二个选项(ReceiveAsync)是您问题的完美解决方案。唯一的风险是在这种情况下,您会减缓发送方,因为Actor现在将异步等待HTTP请求完成。这意味着如果高速率的消息将继续不断地推送到Actor本身上,它可能会被压垮。
如果是这种情况,您可以选择:
  1. 增加消费者的数量(HTTP连接另一端的监听器),以跟上节奏。
  2. 使用Akka Streams来建模您的问题,而不是使用Actors。Streams内置支持反压,可以在上游应用,直到达到请求跟踪的原始源。

谢谢回复。我们打算尝试使用池路由器的ReceiveAsync,并进行一些负载测试,看看效果如何。如果这对我们来说效果不好,可能会考虑使用Akka Streams。 - lachy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接