多进程池和队列

8

我正在使用进程池的多进程技术。我需要将一个结构体作为参数传递给一个函数,在不同的进程中使用它。我遇到了一个问题,就是在multiprocessing.Pool的映射函数中,既不能复制Pool.Queue,也不能复制Pool.Array。这个结构体将会被用来实时记录每个终止进程的结果。以下是我的代码:

import multiprocessing
from multiprocessing import Process, Manager, Queue, Array
import itertools
import time

def do_work(number, out_queue=None):
    if out_queue is not None:
        print "Treated nb ", number
        out_queue.append("Treated nb " + str(number))
    return 0


def multi_run_wrapper(iter_values):
    return do_work(*iter_values)

def test_pool():
    # Get the max cpu
    nb_proc = multiprocessing.cpu_count()

    pool = multiprocessing.Pool(processes=nb_proc)
    total_tasks = 16
    tasks = range(total_tasks)

    out_queue= Queue()  # Use it instead of out_array and change out_queue.append() into out_queue.put() in the do_work() function.
    out_array = Array('i', total_tasks)
    iter_values = itertools.izip(tasks, itertools.repeat(out_array))
    results = pool.map_async(multi_run_wrapper, iter_values)

    pool.close()
    pool.join()
    print results._value
    while not out_queue.empty():
        print "queue: ", out_queue.get()
    print "out array: \n", out_array

if __name__ == "__main__":
    test_pool()

我需要在一个独立的进程中启动一个工作进程,并将我的输出队列作为参数传递。我还想指定包含有限数量运行进程的池。为此,我使用 pool.map_async() 函数。不幸的是,上面的代码片段给了我一个错误:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 808, in __bootstrap_inner
    self.run()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 761, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
    put(task)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__
    assert_spawning(self)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning
    ' through inheritance' % type(self).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

我认为问题出在Queue无法被复制,这也是我在文档中读到的原因。于是我考虑将队列设为全局变量,这样就不需要再传递参数了,但我认为这会很混乱。我也想过使用multiprocessing.Array代替。

out_array = Array('i', total_tasks)

但是与队列一样,会出现相同的错误:

# ...
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance

我需要使用多进程和子进程之间交换信息的功能,因为我的软件比较大,所以希望我的代码保持简洁整洁。

有什么优雅的方法可以将队列传递给我的工作进程吗?

当然,欢迎任何其他处理主要规范的方式。

1个回答

17

multiprocessing.Pool 的工作队列不接受 multiprocessing.Queue 作为参数。我认为这是因为它内部使用队列来向工作进程发送和接收数据。有几种解决方法:

1)你真的需要使用队列吗?Pool 函数的一个优点是其返回值会被发送回主进程。通常最好从池中迭代返回值而不是使用单独的队列。这也避免了通过检查 queue.empty() 引入的竞争条件。

2)如果你必须使用 Queue,可以使用 multiprocessing.Manager 中的一个。这是一个代理到共享队列的对象,可以作为参数传递给 Pool 函数。

3)你可以通过在创建 Pool 时使用 初始化程序(如 https://dev59.com/xm865IYBdhLWcg3wat2l#3843313)将普通的 Queue 传递给工作进程。这有点 hacky。

我上面提到的竞争条件来自于:

while not out_queue.empty():
    print "queue: ", out_queue.get()

当您有工作进程填充队列时,可能会出现队列当前为空的情况,因为工作进程即将向其中放置某些内容。如果此时检查 .empty(),则会提前结束。更好的方法是在队列中放置哨兵值,以便在完成向其放置数据时发出信号。


你给了我许多有价值的建议,谢谢。请放心,竞争条件只是用作虚拟测试 ;) - kaligne
如果将它作为init参数传递,那么它就会被接受。请参阅此处:https://dev59.com/xm865IYBdhLWcg3wat2l?rq=1 - Ciprian Tomoiagă

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接