你能否让 Erlang 进程的消息队列溢出?

4
我还在学习Erlang阶段,所以我可能会有错误,但这就是我理解进程消息队列的方式。一个进程可能处于其主要接收循环中,接收某些类型的消息,而后它可能会被放入等待循环中,以处理第二个循环中不同类型的消息。如果进程在第二个循环中接收到了为第一个循环设计的消息,它将把它们放入队列中,暂时忽略它们,并仅处理当前循环中可以匹配的那些消息。现在,如果它再次进入第一个接收循环,它将从头开始,并再次处理它可以匹配的消息。现在我的问题是,如果这就是Erlang的工作方式,我理解得正确,那么当恶意进程发送大量永远不会被处理的消息时会发生什么?队列最终会溢出,导致进程崩溃,还是应该如何处理?我将打出一个示例来说明我的意思。如果一个恶意程序获得了Pid并重复执行Pid!{malicioudata,LotsOfData},那么这些消息是否会被过滤掉,因为它们永远不可能被处理,还是只会堆积在队列中?
startproc() -> firstloop(InitValues).

firstloop(Values) ->
  receive
    retrieveinformation ->
      WaitingList=askforinformation(),
      retrieveloop(WaitingList);
    dostuff ->
      NewValues=doingstuff(),
      firstloop(NewValues);
    sendmeyourdata ->
      sendingdata(Values),
      firstloop(Values)
  end.

retrieveloop([],Values) -> firstloop(Values).
retrieveloop(WaitingList,Values) ->
  receive
    {hereismyinformation,Id,MyInfo} ->
      NewValues=dosomethingwithinfo(Id,MyInfo),
      retrieveloop(lists:remove(Id,1,WaitingList),NewValues);
  end.

我猜在最好的情况下,你会耗尽内存。但我的问题是:一个“恶意程序”如何与erlang的PIDs交互,攻击者是否嗅探了你的节点cookie?如果是这样,队列溢出是可能发生的最小的坏事。如果出现内存泄漏,如果erlang由于内存问题而崩溃,则可以读取生成的崩溃转储。 - Evan P
在普通情况下,Erlang 的设计假定它在一个受信任的环境中运行。虽然有其他不做此假设的集群方法,但它们只是附加组件,而不是普通 Erlang 运行时配置。这个假设是你直接控制正在操作的集群。在你认为“哦,那太糟糕了”之前,请意识到 Erlang 中的套接字编程非常容易。因此,你可以相对容易地构建任何元服务或微服务的集群。 - zxq9
2个回答

2

消息数量没有硬性限制,也没有固定的内存限制,但是如果您有数十亿条消息(或者几个超级巨大的消息),您肯定会用尽内存。

在您因为巨大的邮箱而 OOM 之前,您会注意到 选择性接收需要很长时间(并不是“选择性接收”总是一个好的模式……)或者无意中窥视进程邮件队列,发现您已经在终端中打开了潘多拉魔盒

在 Erlang 的世界里,这通常被视为限流和监控问题。如果您无法跟上,而且您的问题是可并行化的,那么您需要更多的工作进程。如果您的硬件已经达到极限,那么您需要更高的效率。如果您仍然达到了硬件极限,无法获得更多资源,并且仍然不堪重负,则需要决定如何实现推回或负载分担。


1
@Evalon 这是一个非常好的讨论。并且还有友好的图片,使得讨论更易懂! :-) - zxq9

1
很不幸,没有“消息队列溢出”,它会继续增长,直到 VM 由于内存分配错误而崩溃。
解决方案是在主循环中丢弃任何无效的消息,因为您不应该接收到任何 {hereismyinformation, _,_} 或在 askforinformation() 中由于进程阻塞而收到的消息。
startproc() -> firstloop(InitValues).

firstloop(Values) ->
  receive
    retrieveinformation ->
      WaitingList=askforinformation(),
      retrieveloop(WaitingList, Values); % i assume you meant that
    dostuff ->
      NewValues=doingstuff(),
      firstloop(NewValues);
    sendmeyourdata ->
      sendingdata(Values),
      firstloop(Values);
    _ -> 
      firstloop(Values) % you can't get {hereismyinformation, _,_} here so we can drop any invalid message
  end.

retrieveloop([],Values) -> firstloop(Values).
retrieveloop(WaitingList,Values) ->
  receive
    {hereismyinformation,Id,MyInfo} ->
      NewValues=dosomethingwithinfo(Id,MyInfo),
      retrieveloop(lists:remove(Id,1,WaitingList),NewValues);
  end.

这并不是意外消息的问题,因为它很容易避免,但当进程队列增长速度超过处理速度时,就会出现问题。针对这个具体问题,生产系统中有一个很好的作业框架


问题在于,当接收到“retrieveinformation”时,它应该请求信息,更新自己的值,然后再开始处理其他消息。因此,如果有人要求进程的值,则进程将首先完成更新,仅在完成后发送新值。因此,是否应添加第三个参数“RequestingStateList”,然后在状态更新时迭代该列表,发送当前状态或其他内容? - Peter Raeves
@PeterRaeves 我没明白。我已编辑了我的回答。如果你想像你所做的那样阻止它并删除意外消息,你只需要在你的代码中添加一行即可。只是以为你也想让它异步化。 - Łukasz Ptaszyński
嗯,原来如此简单啊...那确实是我所需要的。而且你说得没错,在第一个循环中我不会指望收到hereismyinformation的消息,如果它们被发送了,无法解释地出现了,那么它们可以安全地被丢弃。谢谢! - Peter Raeves
1
@PeterRaeves 没问题,猜起来并不太难 ;) - Łukasz Ptaszyński

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接