将numpy数组放入多进程队列中的困难

3

我有一些存储在numpy数组中的参数集,我将它们放入了一个多进程队列中,但是当工作进程接收到它们时它们变得混乱。下面是我的代码,以说明我的问题和疑问。

import numpy as np
from multiprocessing import Process, Queue

NUMBER_OF_PROCESSES = 2

def worker(input, output):
    for args in iter(input.get, 'STOP'):
        print('Worker receives: ' + repr(args))
        id, par = args
        # simulate a complex task, and return result
        result = par['A'] * par['B']
        output.put((id, result))

# Define parameters to process
parameters = np.array([
    (1.0, 2.0),
    (3.0, 3.0)], dtype=[('A', 'd'), ('B', 'd')])

# Create queues
task_queue = Queue()
done_queue = Queue()

# Submit tasks
for id, par in enumerate(parameters):
    obj = ('id_' + str(id), par)
    print('Submitting task: ' + repr(obj))
    task_queue.put(obj)

# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
    Process(target=worker, args=(task_queue, done_queue)).start()

# Get unordered results
results = {}
for i in range(len(parameters)):
    id, result = done_queue.get()
    results[id] = result

# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
    task_queue.put('STOP')

print('results: ' + str(results))

在一台64位CentOS电脑上,使用numpy 1.4.1和Python 2.6.6,我的输出结果如下:

Submitting task: ('id_0', (1.0, 2.0))
Submitting task: ('id_1', (3.0, 3.0))
Worker receives: ('id_0', (2.07827093387802e-316, 6.9204740511333381e-310))
Worker receives: ('id_1', (0.0, 1.8834810076011668e-316))
results: {'id_0': 0.0, 'id_1': 0.0}

正如所示,当提交任务时,具有numpy记录数组的元组处于良好状态,但是当worker接收参数时,它们会乱码,并且结果不正确。我在multiprocessing documentation中读到,“代理方法的参数是可picklable的”。据我所知,numpy数组是完全可picklable的:
>>> import pickle
>>> for par in parameters:
...     print(pickle.loads(pickle.dumps(par)))
...     
(1.0, 2.0)
(3.0, 3.0)

我的问题是为什么参数在worker中没有被正确接收?我该如何将numpy记录数组的一行传递给worker?

2个回答

1
numpy数组应该是可序列化的(我想),但这里实际上涉及到numpy.void实例,不知道为什么,它们似乎无法被序列化。
如果您执行:
for par in parameters:
    print(type(par))
    print pickle.loads(pickle.dumps(par))

你得到:

你得到:

<type 'numpy.void'>
(-1.3918046672290164e-41, -1.3918046679677054e-41)
<type 'numpy.void'>
(-1.3918046672290164e-41, -1.3918046679677054e-41)

一种解决方法是应用parameters = parameters.reshape([-1, 1]),将你的(N,)数组变成(N,1)数组。这样当你循环遍历parameters时,你会得到大小为1的数组,希望这样做可以成功地进行pickle。希望这可以帮到你。

看起来你的问题是一个 bug,在这个提交中已经修复了 https://github.com/numpy/numpy/commit/971bab3d51726b95f5afe0c22cbbd7983023f626 - Mike T

0

我遇到了和你一样的问题,但我的情况与你略有不同。

最初,我在子进程中每个循环输出一个数字,并将它们组合成numpy.dnarray。最后,我将数组传递给队列,但是当我运行p.join()后,我的主进程无法启动。

旧代码如下:

# subprocess
for i in range(n):
    array[i] = data[i]
queue.put(array)
# main process
queue.get()

然而,我改变了另一种处理这类问题的方式

# subprocess
for i in range(n):
    queue.put((i, data[i]))
# main process
for i in range(n):
    while queue.empty():
        i, data = queue.get()
        array[i] = data

简单来说,我只是将我的数据分成了较小的部分(数据、位置),并将它们传递到队列中,主进程同步接收数据。希望这能有所帮助。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接