使用异步共享的numpy数组的 Python 多进程:池与队列

5
我希望在一个规则的网格上生成周期性的Perlin噪声。我需要生成多个地图,并且网格相当大,因此我想使用多进程,在每个核心上生成一个地图。
这些地图将被绘制在一个图形上,并依次放置在单个二进制dat文件中。这些地图将存储在单个numpy数组中,其大小为地图数*节点数,因此一个切片将是一个地图,因此我可以同时访问数组的不同区域而不必担心。
我参考了这个线程,它使用池和这个线程,在其中我使用队列来进行多进程绘图。
我有两段代码:使用队列的那段在我的电脑上运行良好,但在我实验室的工作站或我的专业笔记本电脑上无法正常工作:没有错误消息,它只是在某些时候会冻结。第二段代码很好地解决了这个问题,我认为它比第一个示例更简单,因为我直接写入numpy数组。(我没有完全理解第一个链接的异步情况所需的所有函数和init的需要。)
我的问题是:为什么我的第一段代码有问题?我只在下面放置了我认为相关的代码片段。
感谢您的帮助。
第一次尝试:
def generate_irradiation_maps(rad_v):
    while tasks_queue.empty() == False:
        print("fetching work ...")
        map_index = tasks_queue.get()  # get some work to do from the queue
        print("----> working on map: %s" % map_index)
        perm = range(permsize)
        random.shuffle(perm)
        perm += perm
        for i in range(nb_nodes):
            # call the perlin function: fBm
            rad_v[map_index, i] = fBm(perm, x[i] * freq, y[i] * freq, int(sizex *     freq), int(sizey * freq), octs, persistance)
        rad_v[map_index, :] = rad_v[map_index, :] + abs(min(rad_v[map_index, :]))
        rad_v[map_index, :] = rad_v[map_index, :] / max(rad_v[map_index, :])
        figure = plt.figure(figsize=(20, 7))
        plt.tricontourf(x, y, rad_v[map_index, :])
        plt.axis('image')
        plt.colorbar(shrink=.5)
        figure.savefig('diff_gb_and_pf_irrad_c_map_' + str(map_index) + '.png')
        plt.clf()
        plt.close()
        tasks_queue.task_done()  # work for this item finished

start_time = time.time()
nb_maps = 10
nb_proc = 1  # number of processes

print("generating %d irradiation maps" % nb_maps)
irrad_c_base_array = mp.Array(ctypes.c_double, nb_maps * nb_nodes)  
irrad_c = np.frombuffer(irrad_c_base_array.get_obj())
irrad_c = irrad_c.reshape(nb_maps, nb_nodes)

tasks_queue = mp.JoinableQueue()  # a queue to pile up the work to do

jobs = list(range(nb_maps))  # each job is composed of a map
print("inserting jobs in the queue...")
for job in jobs:
    tasks_queue.put(job)
print("done")

# launch the processes
for i in range(nb_proc):
    current_process = mp.Process(target=generate_irradiation_maps, args=(irrad_c,     ))
    current_process.start()

# wait for all tasks to be treated
tasks_queue.join()

第二次尝试:

def generate_irradiation_maps(arg_list):
    map_index = arg_list[0]
    print('working on map %i ' % map_index)
    perm = range(permsize)
    random.shuffle(perm)
    perm += perm
    for i in range(nb_nodes):
        arg_list[1][i] = fBm(perm, x[i] * freq, y[i] * freq, int(sizex * freq),     int(sizey * freq), octs, persistance)
    arg_list[1][:] = arg_list[1][:] + abs(min(arg_list[1][:]))
    arg_list[1][:] = arg_list[1][:] / max(arg_list[1][:])
# plot
figure = plt.figure(figsize=(20, 7))
#plt.tricontourf(x, y, rad_v[map_index, :])
plt.tricontourf(x, y, arg_list[1][:])
plt.axis('image')
plt.colorbar(shrink=.5)
figure.savefig('diff_gb_and_pf_irrad_c_map_' + str(map_index) + '.png')
plt.clf()
plt.close()


start_time = time.time()
nb_maps = 2
nb_proc = 2  # number of processes

print("generating %d irradiation maps" % nb_maps)
irrad_c_base_array = mp.Array(ctypes.c_double, nb_maps * nb_nodes)  # we build     shared array, accessible from all process. we don't access the same zones.
irrad_c = np.frombuffer(irrad_c_base_array.get_obj())
irrad_c = irrad_c.reshape(nb_maps, nb_nodes)

args = [[i,irrad_c[i,:]] for i in range(nb_maps)]

with closing(mp.Pool(processes=nb_proc)) as jobs_pool:
    jobs_pool.map_async(generate_irradiation_maps,args)
jobs_pool.join()
1个回答

0

我个人在多进程编程方面遇到了很多麻烦。这篇博客文章提出了一种可能的解决方案。如果你在 POSIX 和 Windows 操作系统之间切换(例如从 Linux、Unix 或 Mac),那么生成子进程的行为是不同的。博客文章的结尾建议添加以下代码行,以帮助防止进程死锁。

from multiprocessing import set_start_method
set_start_method("spawn")

很抱歉,您分享的代码不是自包含的,因此我无法测试它。如果您在不同的操作系统上执行代码,则可以尝试这个方法,看看是否有所帮助!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接