在Windows上使用Python多进程处理大型数组

3
我在Linux平台上使用Python的多进程模块编写了一个脚本。当我尝试在Windows上运行程序时,发现直接运行不起来,这与Windows生成子进程的方式有关。似乎至关重要的是所使用的对象可以被pickle。我的主要问题是,我正在使用大型的numpy数组。似乎当它们达到一定大小时就无法再进行序列化了。为了将其简化为一个简单的脚本,我想做类似这样的事情:
### Import modules

import numpy as np
import multiprocessing as mp

number_of_processes = 4

if __name__ == '__main__':

    def reverse_np_array(arr):
        arr = arr + 1
        return arr

    a = np.ndarray((200,1024,1280),dtype=np.uint16)

    def put_into_queue(_Queue,arr):
        _Queue.put(reverse_np_array(arr))


    Queue_list = []
    Process_list = []
    list_of_arrays = []

    for i in range(number_of_processes):
        Queue_list.append(mp.Queue())


    for i in range(number_of_processes):
        Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],a)))

    for i in range(number_of_processes):
        Process_list[i].start()

    for i in range(number_of_processes):
        list_of_arrays.append(Queue_list[i].get())

    for i in range(number_of_processes):
        Process_list[i].join()

我收到了以下错误信息:
Traceback (most recent call last):
  File "Windows_multi.py", line 34, in <module>
    Process_list[i].start()
  File "C:\Program Files\Anaconda32\lib\multiprocessing\process.py", line 130, i
n start
    self._popen = Popen(self)
  File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 277, i
n __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 199, i
n dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 224, in dump
    self.save(obj)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())

所以,我基本上正在创建一个大数组,我需要在所有进程中对该数组进行计算并返回它。
一个重要的事情似乎是在语句 if __name__ = '__main__': 之前编写函数的定义。
如果将数组减少到 (50,1024,1280),整个程序就可以运行。 然而,即使启动了 4 个进程并且有 4 个核心在工作,它的速度也比仅为一个核心编写代码时慢(在 Windows 上)。因此,我认为我在这里还有另一个问题。
我的真实程序中的函数在一个 Cython 模块中。
我使用的是 Anaconda 包和 Python 32 位,因为我无法使用 64 位版本编译我的 Cython 包(我会在另一个线程中提问)。
任何帮助都受到欢迎!
谢谢! 菲利普
更新:
我犯的第一个错误是在 __main__ 中定义了一个 "put_into_queue" 函数。
然后我按建议引入了共享数组,但是它使用了大量的内存,并且所使用的内存随着我使用的进程而增加(当然不应该这样)。有什么想法吗?我在这里做错了什么吗?似乎定义共享数组的位置并不重要(在或者在__main__之外),虽然我认为它应该在__main__中。从这篇文章中得到的:Is shared readonly data copied to different processes for Python multiprocessing?
import numpy as np
import multiprocessing as mp
import ctypes


shared_array_base = mp.Array(ctypes.c_uint, 1280*1024*20)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
#print shared_array
shared_array = shared_array.reshape(20,1024,1280)

number_of_processes = 4

def put_into_queue(_Queue,arr):
    _Queue.put(reverse_np_array(arr))
def reverse_np_array(arr):
    arr = arr + 1 + np.random.rand()
    return arr
if __name__ == '__main__':


    #print shared_arra

    #a = np.ndarray((50,1024,1280),dtype=np.uint16)


    Queue_list = []
    Process_list = []
    list_of_arrays = []

    for i in range(number_of_processes):
        Queue_list.append(mp.Queue())


    for i in range(number_of_processes):
        Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],shared_array)))

    for i in range(number_of_processes):
        Process_list[i].start()

    for i in range(number_of_processes):
       list_of_arrays.append(Queue_list[i].get())

    for i in range(number_of_processes):
        Process_list[i].join()

这个问题的答案对你有帮助吗?https://dev59.com/9W3Xa4cB1Zd3GeqPeWkk#14593135。这个想法是创建共享数组,父进程和子进程都可以写入,而不是使用pickling。 - Andrew
嗨,谢谢回答,我尝试使用共享数组,但它不起作用,如上所述。有人知道为什么吗?干杯 - Fips
你正在将共享数组放入队列中。链接的示例没有这样做。从一个可工作的示例开始,验证它是否正常工作,并进行小的更改,直到它达到你想要/期望的行为。 - Andrew
谢谢提示!只是为了验证我是否正确理解了多进程和队列:如果我想要并行处理,需要输出,那么我必须使用队列,是吗?否则我就无法获取数据了?线程和/或队列(不是mp.queue)模块对我的应用程序更合适吗?因为我只想在数组的部分(其中“部分”等于核心数量)上执行独立操作。只是想看看是否值得让我退一步,检查我是否使用了正确的模块。再次感谢! - Fips
1个回答

0

你没有包含完整的回溯信息,结尾部分是最重要的。在我的32位Python中,我得到了相同的回溯信息,最终以此结束

  File "C:\Python27\lib\pickle.py", line 486, in save_string
    self.write(BINSTRING + pack("<i", n) + obj)
MemoryError

MemoryError是一个异常,它表示你的内存不足。

64位Python可以解决这个问题,但在multiprocessing中发送大量数据会很容易成为一个严重的瓶颈。


谢谢回复! 是的,你说得对。但现在我该如何解决这个问题呢?我认为一定有一种优雅的方法来处理这个问题,因为我认为人们处理更大的数组。在我的情况下,它不会成为瓶颈,因为我只发送一次数组(前向和后向)。 - Fips
@Fips 一个可能的解决方案是在共享内存中使用numpy数组 - Janne Karila

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接