如何使用多进程实现发布/订阅模式?

4
有没有一种使用数据结构实现发布/订阅模式的方法?换句话说,我想要像队列一样的东西,但是发布者可以将单个命令同时发送给多个工作进程。
1个回答

22

您可以创建自己的数据结构,使用 multiprocessing.Queue 的封装来实现简单的发布/订阅模式:

您可以使用一个封装 multiprocessing.Queue 的类来实现发布/订阅模式。

import os
import multiprocessing
from functools import wraps


def ensure_parent(func):
    @wraps(func)
    def inner(self, *args, **kwargs):
        if os.getpid() != self._creator_pid:
            raise RuntimeError("{} can only be called in the "
                               "parent.".format(func.__name__))
        return func(self, *args, **kwargs)
    return inner

class PublishQueue(object):
    def __init__(self):
        self._queues = []
        self._creator_pid = os.getpid()

    def __getstate__(self):
        self_dict = self.__dict__
        self_dict['_queues'] = []
        return self_dict

    def __setstate__(self, state):
        self.__dict__.update(state)

    @ensure_parent
    def register(self):
        q = multiprocessing.Queue()
        self._queues.append(q)
        return q

    @ensure_parent
    def publish(self, val):
        for q in self._queues:
            q.put(val)

def worker(q):
    for item in iter(q.get, None):
        print("got item {} in process {}".format(item, os.getpid()))

if __name__ == "__main__":
    q = PublishQueue()
    processes = []
    for _ in range(3):
        p = multiprocessing.Process(target=worker, args=(q.register(),))
        p.start()
        processes.append(p)
    q.publish('1')
    q.publish(2)
    q.publish(None)  # Shut down workers

    for p in processes:
        p.join()

输出:

got item 1 in process 4383
got item 2 in process 4383
got item 1 in process 4381
got item 2 in process 4381
got item 1 in process 4382
got item 2 in process 4382

只要父进程是唯一执行发布操作的进程,并且您在父进程中为每个工作进程注册一个订阅队列,然后使用其multiprocessing.Process构造函数将该订阅队列传递给工作进程,这种模式将能够很好地运作。这些限制是由于multiprocessing.Queue无法被序列化所导致的。如果您需要将订阅队列传递给已经运行的工作进程,则需要调整实现以使用multiprocessing.Manager.Queue代替。


不确定为什么你被踩了。看起来这回答了问题。 - Twirrim
1
@Twirrim 我也不是!不过这就是生活。 - dano
如果队列是管理器队列,那么它们不是可挂起的吗?因为它们只是引用。 - CpILL
@CpILL 是的 - 我的回答最后一句提到了这一点。 - dano

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接