一个多进程 Manager 如何创建多个队列

5
我正在编写一个脚本,将使用Python的多进程和线程模块。为了您的理解,我会生成与可用核心数量相等的进程,并在每个进程内部启动25个线程。每个线程从input_queue中消耗并生产到output_queue。对于队列对象,我使用multiprocessing.Queue
在我的第一次测试后,我出现了死锁,因为负责喂养和刷新队列的线程挂起了。过了一段时间,我发现可以使用Queue().cancel_join_thread() 来解决这个问题。
但是由于存在数据丢失的可能性,我想使用: multiprocessing.Manager().Queue() 现在的问题是: 使用一个管理器对象来处理每个队列是否更好?还是应该创建一个管理器并从同一个管理器对象中获取两个队列?
# One manager for all queues
import multiprocessing

manager = multiprocessing.Manager()
input_queue = manager.Queue()
output_queue = manager.Queue()

...Magic...

# As much managers as queues
manager_in = multiprocessing.Manager()
queue_in = manager_in.Queue()

manager_out = multiprocessing.Manager()
queue_out = manager_out.Queue()

...Magic...

感谢您的帮助。
1个回答

8
不需要使用两个单独的 Manager 对象。正如您已经看到的那样,Manager 对象允许在多个进程之间共享对象;来自文档的说明:

管理器提供一种创建可在不同进程之间共享的数据的方法。管理器对象控制管理共享对象的服务器进程。其他进程可以通过使用代理来访问共享对象。

因此,如果您有两个不同的队列,仍可以使用同一个管理器。以下是使用一个管理器和两个队列的简单示例,希望对某些人有所帮助:
from multiprocessing import Manager, Process
import time


class Worker(Process):
    """
    Simple worker.
    """

     def __init__(self, name, in_queue, out_queue):
        super(Worker, self).__init__()
        self.name = name
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        while True:
            # grab work; do something to it (+1); then put the result on the output queue
            work = self.in_queue.get()
            print("{} got {}".format(self.name, work))
            work += 1

            # sleep to allow the other workers a chance (b/c the work action is too simple)
            time.sleep(1)

            # put the transformed work on the queue
            print("{} puts {}".format(self.name, work))
            self.out_queue.put(work)


if __name__ == "__main__":
    # construct the queues
    manager = Manager()
    inq = manager.Queue()
    outq = manager.Queue()

    # construct the workers
    workers = [Worker(str(name), inq, outq) for name in range(3)]
    for worker in workers:
        worker.start()

    # add data to the queue for processing
    work_len = 10
    for x in range(work_len):
        inq.put(x)

    while outq.qsize() != work_len:
        # waiting for workers to finish
        print("Waiting for workers. Out queue size {}".format(outq.qsize()))
        time.sleep(1)

    # clean up
    for worker in workers:
        worker.terminate()

    # print the outputs
    while not outq.empty():
        print(outq.get())

使用两个管理器可以这样实现:

# construct the queues
manager1 = Manager()
inq = manager1.Queue()
manager2 = Manager()
outq = manager2.Queue()

这个功能可以正常运行,但是并不必要。


谢谢。这正是我实现管理器和队列的方式。 - dvonessen
@Paul 我在 __init__ 中加了一行代码 print("pid: {}".format(os.getpid())),结果发现所有这样创建的进程都有相同的 pid,因此共享同一内存?如何使用相同的排队机制创建不同的进程? 我尝试了:workers = [Process(target = Worker, args = ((str(name), inq, outq))) for name in range(3)] 但是工作进程似乎无法从 'inq' 收到消息。 非常感谢您的帮助! - Jimmy
1
@Jimmy 尝试将相同的 print 放入 Worker 进程的 run(..) 方法中。你会发现这段代码正在不同的进程中运行,因此它们确实是不同的进程。我现在找不到一个好的链接,但是虽然 __init__ 在主进程上运行,但还有另一个进程被 forked/spawned,并且那里将执行 run - Paul
@Paul 很有趣。感谢您的回复。 - Jimmy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接