通过 Erlang 节点之间的消息传递吞吐量、超时和保证

4
现在假设我们正在设计一个应用程序,由2个Erlang节点组成。在节点A上,将有许多进程,数量可能达到数千个。这些进程通过向节点B上的已注册进程发送消息来访问节点B上的资源。

在节点B上,假设您已通过执行以下函数启动了一个进程:
start_server()->
    register(zeemq_server,spawn(?MODULE,server,[])),ok.<br>
server()-&gt;
    receive
            {{CallerPid, Ref}, {Module, Func, Args}} -&gt;
                    Result = (catch erlang:apply(Module, Func, Args)),
                    CallerPid ! {Ref, Result},
                    server();
            _ -&gt; server()
    end.

在节点A上,任何想要在节点B上执行给定模块中的任何函数的进程,都会使用以下代码片段:
call(Node, Module, Func, Args)-&gt;
        Ref = make_ref(),
        Me = self(),
        {zeemq_server,Node} ! {{Me, Ref}, {Module, Func, Args}},
        receive
                {Ref, Result} -&gt; Result
        after timer:minutes(3) -&gt; 
            error_logger:error_report(["Call to server took so long"]),
            {error,remote_call_failed}
        end.

假设节点B上的进程zeemq_server永远不会停机,并且节点A和B之间的网络连接始终正常,请回答以下问题:

问题1:由于节点B上只有一个接收进程,因此其邮箱很可能一直处于满载状态。这是因为节点A上的进程很多,在给定的时间间隔(例如2秒),每个进程至少向节点B服务器发起一次调用。在哪些方式下,可以使节点B上的接收变得冗余?例如,进程组等,并解释(概念)如何替换上面的服务器端代码。展示客户端侧将会发生什么变化。

问题2:在只有一个接收器在节点B中的情况下,进程邮箱中允许的最大消息数量是多少?如果单个进程邮箱被太多消息淹没,erlang会如何响应?

问题3:通过使用上述概念,有哪些方式可以确保每个发送请求的进程在超时发生之前尽快收到答案?将节点B上的接收部分转换为并行操作是否有帮助?就像这样:

start_server()-&gt;
    register(zeemq_server,spawn(?MODULE,server,[])),ok.<br>
server()-&gt;
    receive
            {{CallerPid, Ref}, {Module, Func, Args}} -&gt;
                    <b>spawn(?MODULE,child,[Ref,CallerPid,{Module, Func, Args}]),</b>
                    server();
            _ -&gt; server()
    end.    
child(Ref,CallerPid,{Module, Func, Args})-&gt;
    Result = (catch erlang:apply(Module, Func, Args)),
    CallerPid ! {Ref, Result},
    ok.

上述方法可能会增加在节点B上运行的瞬时进程数量,这可能会因为内存而严重影响服务。但是,它看起来很不错,并使server()循环立即返回以处理下一个请求。您对此修改有何看法?

最后:说明您将如何在节点B上实现一组接收器线程池,同时在节点A方面显示为一个名称。这样,传入的消息将在接收器线程之间进行复用,并在这组进程中共享负载。保持问题的含义不变。


这与Java有关吗? - Subhrajyoti Majumder
没有得到太多的答案,如果您不介意我问一下,最终您做了什么? - Roman Rabinovich
2个回答

2
一个进程邮箱中消息的最大数量是没有限制的,除非受到内存量的限制。此外,如果您需要检查邮箱大小,请使用:
erlang:process_info(self(),[message_queue_len,messages]).

这将返回类似以下的内容:
[{message_queue_len,0},{messages,[]}]

我建议你先将你的服务器转换成一个gen_server。这是你的工作进程。
接下来,我建议使用poolboy(https://github.com/devinus/poolboy)创建一个池,其中包含你的服务器实例作为poolboy的工作进程(在他们的github Readme.md中有示例)。最后,我建议创建一个专门用于调用者的模块,其中包含一个帮助方法,该方法创建一个poolboy事务并将池中的Worker参数应用于函数。以下是从他们的github抄袭的示例:
squery(PoolName, Sql) ->
    poolboy:transaction(PoolName, fun(Worker) ->
                                     gen_server:call(Worker, {squery, Sql})
                                  end).

话虽如此,也许 Erlang RPC 更适合您的需求?Erlang RPC 的详细信息可在http://www.erlang.org/doc/man/rpc.html找到。有关 Erlang RPC 的良好介绍,请参阅http://learnyousomeerlang.com/distribunomicon#rpc

0

在处理每个请求时生成一个新进程可能有些过度,但如果不知道每个请求需要做什么,很难说。

您可以拥有一组进程来处理每个消息,使用轮询方法分发请求或基于请求类型处理它,将其发送到子进程或生成进程。您还可以通过查看它们的消息队列并在负载过重时启动新的子进程来监视池化进程的负载。使用监督者..只需在init中使用send_after每隔几秒钟监视负载并相应地采取行动。如果可以,请使用OTP,虽然有开销,但是它是值得的。

我不会在专用线路通信中使用http,我认为这太多了。您可以使用一组进程来控制负载。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接