在Pool.imap调用的函数中,我能否使用一个多进程队列(multiprocessing Queue)?

35

我正在使用Python 2.7,并尝试将一些CPU密集型任务运行在它们自己的进程中。我希望能够向父进程发送消息,以便让其了解该进程的当前状态。多进程队列似乎非常适合此任务,但我不知道如何使其正常工作。

这是我的基本工作示例,没有使用队列。

import multiprocessing as mp
import time

def f(x):
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print str(results.next())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

我已经尝试过多种方法将Queue传递给其他进程,但是一直收到错误信息“RuntimeError:Queue对象只应通过继承在进程之间共享”。这里是我基于早期找到的一个答案尝试的其中一种方式。(我在尝试使用Pool.map_async和Pool.imap时也遇到了相同的问题)

import multiprocessing as mp
import time

def f(args):
    x = args[0]
    q = args[1]
    q.put(str(x))
    time.sleep(0.1)
    return x*x

def main():
    q = mp.Queue()
    pool = mp.Pool()
    results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))

    print str(q.get())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

最后,0适应度方法(使其全局)不会生成任何消息,只会锁死。

import multiprocessing as mp
import time

q = mp.Queue()

def f(x):
    q.put(str(x))
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print q.get()

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

我知道直接使用multiprocessing.Process可能会起作用,也有其他库可以完成此操作,但在确定不仅仅是我的知识缺乏阻止我能够利用它们之前,我不想放弃非常适合的标准库函数。

谢谢。


你考虑过使用Jug吗:http://luispedro.org/software/jug? - luispedro
2个回答

57

关键在于将队列作为参数传递给初始化程序。这似乎适用于所有池调度方法。

import multiprocessing as mp

def f(x):
    f.q.put('Doing: ' + str(x))
    return x*x

def f_init(q):
    f.q = q

def main():
    jobs = range(1,6)

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, jobs)
    p.close()

    for i in range(len(jobs)):
        print q.get()
        print results.next()

if __name__ == '__main__':
    main()

7
multiprocessing.Poolinitializerinitargs 参数的作用和用处展示得非常好! - Chris Arndt
3
在Python中,每个函数都是一个对象(参见http://docs.python.org/reference/datamodel.html#the-standard-type-hierarchy Callable Types)。因此,f.q会在函数对象f上设置一个名为q的属性。这只是一种快速轻便的方法,以便稍后可以使用保存的Queue对象。 - Olson
2
f.q = q 不就是猴子补丁的一个例子吗?https://dev59.com/Mm035IYBdhLWcg3wGMBT - Matthew Cornell
1
这使我能够将多进程日志记录模式(http://plumberjack.blogspot.com.au/2010/09/using-logging-with-multiprocessing.html)应用于异步方法。 - Jaxor24
1
这样的肮脏代码。全局状态。将变量分配给函数对象。 - iperov
显示剩余3条评论

1

使用 fork 启动方法(即在 Unix 平台上),您不需要使用顶部答案中的那个初始化技巧。

只需将 mp.Queue 定义为全局变量,子进程就会正确继承它。

在 Linux 上使用 Python 3.9.7,OP 的示例可以正常工作(代码稍作调整):

import multiprocessing as mp
import time

q = mp.Queue()


def f(x):
    q.put(str(x))
    return x * x


def main():
    pool = mp.Pool(5)
    pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    for _ in range(1, 6):
        print(q.get())

    pool.close()
    pool.join()


if __name__ == '__main__':
    main()

输出:

2
1
3
4
5

虽然已经过去了12年,但我还是想确保任何遇到这个问题的Linux用户都知道最佳答案的技巧只在你无法使用fork时才需要。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接