为什么我使用multiprocessing.Process时,ZeroMQ无法进行通信?

5
请看下面的代码:

server.py

import zmq 
import time
from multiprocessing import Process
class A:
  def __init__(self):
    ctx = zmq.Context(1)
    sock = zmq.Socket(ctx, zmq.PUB)
    sock.bind('ipc://test')
    p = Process(target=A.run, args=(sock,))
    p.start()     # Process calls run, but the client can't receive messages
    p.join()      #
    #A.run(sock)  # this one is ok, messages get it to be received

  @staticmethod
  def run(sock):
    while True:
      sock.send('demo'.encode('utf-8'))
      print('sent')
      time.sleep(1)

if __name__ =='__main__':
  a = A()

client.py

import zmq 
ctx=zmq.Context(1)
sock = zmq.Socket(ctx, zmq.SUB)
sock.connect('ipc://test')
sock.setsockopt_string(zmq.SUBSCRIBE, '') 
while True:
  print(sock.recv())

server.py的构造函数中,如果我直接调用.run()方法,客户端可以收到消息,但是当我使用multiprocessing.Process()方法时,它会失败。有人能解释一下这个问题,并提供一些建议吗?
2个回答

3

Q : "当我使用multiprocessing.Process运行时,为什么ZeroMQ无法通信?"

实际上,ZeroMQ并没有通信失败,问题在于Python的multiprocessing模块“操作”方式。

该模块被设计成一些处理可以逃离Python中央GIL锁(再次串行化,用作永久存在的并发情况的主要避免者)。

这意味着调用multiprocessing.Process会生成一个精确的Python解释器状态的“镜像副本”,导出到新的操作系统生成的进程中(详细信息取决于localhost操作系统)。

鉴于此,"镜像"副本没有机会访问已经由__main__拥有的资源-在这里,.bind()方法已经获取了ipc://test地址,因此"远程"进程将永远无法获得"权限"来触及这个ZeroMQ接入点,除非代码得到修复和完全重构。

Q : "有人能解释一下这个问题并提供一些建议吗?"

当然。开始的最好步骤是充分理解Pythonic文化中垄断GIL锁重新[SERIAL]化,即没有两件事情会同时发生,因此即使添加更多线程也无法加快处理流程的速度,因为所有内容都会被中心“垄断者”GIL锁重新对齐。

其次,理解完全反映Python解释器状态的承诺,虽然听起来很有前途,但也有一些明显的缺点 - 新进程作为“镜像”副本不能在已拥有的资源上引入冲突情况。如果它们试图这样做,则在这种原则上设计不良的情况下,“未按预期工作”的情况是较轻微的问题。

在你的代码中,__main__ 中的第一行实例化了 a = A(),其中 A.__init__ 方法直接占用了 IPC 资源,因为它调用了 .bind('ipc://test')。稍后的步骤 p = Process( target = A.run, args = ( sock, ) ) “镜像”复制了 Python 解释器的状态(完全相同的副本),而 p.start() 由于已经被 __main__ 拥有同样的资源(是的,在 .bind('ipc://test') 中指令调用“镜像”进程抓取同样不自由的资源 ipc://test)。这永远不会成功。
最后但并非最不重要的是,享受 Martin SUSTRIK 的杰作—— 的零之禅,它非常适合于极具可扩展性、几乎零延迟、非常舒适、广泛移植的信令和消息框架。

2
简短回答:启动你的子进程。在每个子进程中从Producer.run()方法中创建你的zmq.Context.Socket实例。在基数为1的一侧使用.bind()方法,在基数大于1的一侧(即“服务器”)使用.connect()方法。请保留HTML标签。
我的做法结构化如下: # server.py :
    import zmq
    from multiprocessing import Process

    class Producer (Process):
    
      def init(self):
        ...
    
      def run(self):
        ctx = zmq.Context(1)
        sock = zmq.Socket(ctx, zmq.PUB)
        # Multiple producers, so connect instead of bind (consumer must bind)
        sock.connect('ipc://test')
        while True:
          ...
    
    if __name__ == "__main__":
      producer = Producer()
      p = Process(target=producer.run)
      p.start()
      p.join()

# client.py :

    import zmq

    ctx = zmq.Context(1)
    sock = zmq.Socket(ctx, zmq.SUB)
    # Capture from multiple producers, so bind (producers must connect)
    sock.bind('ipc://test')
    sock.setsockopt_string(zmq.SUBSCRIBE, '') 
    while True:
      print(sock.recv())


抱歉,这是不可能的 - 你承诺在每个子进程中创建一个工作解决方案是错误的,并且会因为与已拥有的资源冲突而失败(每个后续的.bind()将拒绝成功完成,一旦第一个成功拥有地址)。 - user3666197
你说得对,我在关注上下文和套接字时忽略了绑定/连接方向。我会更新我的答案。 - SamR

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接