多进程中fork时的multiprocessing.Queue行为

3
在执行了几次os.fork()之后,我想要与子进程进行数据交换。为此,我使用multiprocessing.Queue 实例。当父进程put数据时,子进程get数据是可行的,但反过来则不行。以下是我的示例代码:
import os
import multiprocessing as mp
from queue import Empty

if __name__ == '__main__':

    n_workers = 5

    forward_queue = mp.Queue()
    pids_queue = mp.Queue()

    for n in range(n_workers):
        forward_queue.put(n)

    for n in range(n_workers):
        child = os.fork()
        if child:
            pass
        else:
            my_number = forward_queue.get()
            print('pid={} here, my number is {}'.format(os.getpid(), my_number))
            pids_queue.put(os.getpid())
            os._exit(0)  # correct way to exit a fork according to docs

    while True:
        try:
            pid_of_child = pids_queue.get(timeout=5)
        except Empty:
            print('no more pids')
            break
        else:
            print('one of my children had this pid={}'.format(pid_of_child))

我得到的输出:

pid=19715 here, my number is 0
pid=19716 here, my number is 1
pid=19717 here, my number is 2
pid=19721 here, my number is 3
pid=19718 here, my number is 4
no more pids

the output I would expect:

pid=19715 here, my number is 0
pid=19716 here, my number is 1
pid=19717 here, my number is 2
pid=19721 here, my number is 3
pid=19718 here, my number is 4
one of my children had this pid=19715
one of my children had this pid=19716
one of my children had this pid=19717
one of my children had this pid=19721
one of my children had this pid=19718
no more pids

有人能解释一下为什么会发生这种情况吗?
1个回答

4

在退出fork之前尝试一下这个:

pids_queue.close()
pids_queue.join_thread()

问题在于队列的工作方式。当你将一个值放到队列中时,会启动一个后台线程将该项传输到管道中。当您立即调用 os._exit 时,该线程将被关闭。针对这种类型的问题,开发了 .close 和 .join_thread 方法。

2
我刚刚写了一个类似的答案。请注意,与其使用os.fork()相比,你可能最好使用multiprocessing.Process() - petre
2
@petre 是的,那样会更好。我喜欢从一个干净的流程开始,并且只提供那些必要的对象来进行该流程。 - Felix Kleine Bösing
2
谢谢。我通常也会选择更清晰的multiprocessing.Process(),但这个例子中提取出来的完整情况确实需要执行裸的os.fork() - Lester Jack
1
有同样的问题,已经解决了,谢谢! - B.abba

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接