Python多进程池队列通信

3

我正在尝试实现一个由两个并行运行且通过队列通信的进程池。

目标是使用队列,使写入者进程将消息传递给读取者进程。

每个进程都在终端上打印反馈以获取反馈信息。

以下是代码:

#!/usr/bin/env python

import os
import time
import multiprocessing as mp
import Queue

def writer(queue):
    pid = os.getpid()
    for i in range(1,4):
        msg = i
        print "### writer ", pid, " -> ", msg
        queue.put(msg)
        time.sleep(1)
        msg = 'Done'
    print '### '+msg
    queue.put(msg)

def reader(queue):
    pid = os.getpid()
    time.sleep(0.5)
    while True:
        print "--- reader ", pid, " -> ",
        msg = queue.get()
        print msg
        if msg == 'Done':
            break

if __name__ == "__main__":
    print "Initialize the experiment PID: ", os.getpid()
    mp.freeze_support()

    queue = mp.Queue()

    pool = mp.Pool()
    pool.apply_async(writer, (queue)) 
    pool.apply_async(reader, (queue))

    pool.close()
    pool.join()

我期望得到的输出应该像这样:

我期望得到的输出应该像这样:

Initialize the experiment PID: 2341
writer 2342 -> 1
reader 2343 -> 1
writer 2342 -> 2
reader 2343 -> 2
writer 2342 -> 3
reader 2343 -> 3
Done

然而,我只得到了这行代码:

Initialize the experiment PID: 2341

然后脚本就会退出。

在通过队列进行通信的进程池中实现两个进程的进程间通信的正确方法是什么?


1
你的 apply_async 正在吞噬一个异常,你不能像那样传递一个 queue。根据这个答案,只需将队列设置为全局或使用管理器派生队列即可。 - Mark
太好了,现在它可以工作了。非常感谢! - MaX
1个回答

6

我使用了mp.Manager().Queue()作为队列,因为我们不能直接传递Queue。尝试直接使用Queue会导致异常,但由于我们使用了apply_async,这些异常未被处理。

我已将您的代码更新为:

#!/usr/bin/env python

import os
import time
import multiprocessing as mp
import Queue

def writer(queue):
    pid = os.getpid()
    for i in range(1,4):
        msg = i
        print "### writer ", pid, " -> ", msg
        queue.put(msg)
        time.sleep(1)
        msg = 'Done'
    print '### '+msg
    queue.put(msg)

def reader(queue):
    pid = os.getpid()
    time.sleep(0.5)
    while True:
        print "--- reader ", pid, " -> ",
        msg = queue.get()
        print msg
        if msg == 'Done':
            break

if __name__ == "__main__":
    print "Initialize the experiment PID: ", os.getpid()
    manager = mp.Manager()

    queue = manager.Queue()

    pool = mp.Pool()
    pool.apply_async(writer, (queue,))
    pool.apply_async(reader, (queue,))

    pool.close()
    pool.join()

我得到了这个输出:
Initialize the experiment PID:  46182
### writer  46210  ->  1
--- reader  46211  ->  1
### writer  46210  ->  2
--- reader  46211  ->  2
### writer  46210  ->  3
--- reader  46211  ->  3
### Done
--- reader  46211  ->  Done

我相信这是你期望的内容。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接