在使用多进程时克服内存限制

6
我正在使用进化算法(CMAES)进行函数优化。为了更快地运行,我使用了多进程模块。我需要优化的函数在下面的代码中输入大矩阵 (input_A_Opt 和 input_B_Opt)

它们的大小达到几个GB。当我不使用多进程运行函数时,它可以正常工作。但是当我使用多进程时,似乎存在内存问题。如果我使用小的输入运行它,那么它可以正常工作,但是当我使用完整的输入运行时,就会出现以下错误:

File "<ipython-input-2-bdbae5b82d3c>", line 1, in <module>
opt.myFuncOptimization()

File "/home/joe/Desktop/optimization_folder/Python/Optimization.py", line 45, in myFuncOptimization
**f_values = pool.map_async(partial_function_to_optmize, solutions).get()**
File "/usr/lib/python3.5/multiprocessing/pool.py", line 608, in get
raise self._value
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 385, in _handle_tasks
put(task)
File "/usr/lib/python3.5/multiprocessing/connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))

File "/usr/lib/python3.5/multiprocessing/connection.py", line 393, in _send_bytes
header = struct.pack("!i", n)

error: 'i' format requires -2147483648 <= number <= 2147483647

这是代码的简化版本(如果我使用输入的大小减小10倍运行它,一切正常):

import numpy as np
import cma
import multiprocessing as mp
import functools
import myFuncs
import hdf5storage



def myFuncOptimization ():

    temp = hdf5storage.loadmat('/home/joe/Desktop/optimization_folder/matlab_workspace_for_optimization')    

    input_A_Opt  = temp["input_A"]
    input_B_Opt  = temp["input_B"]

    del temp

    numCores = 20

    # Inputs
   #________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________
    P0 = np.array([            4.66666667, 2.5,    2.66666667, 4.16666667, 0.96969697,     1.95959596,     0.44088176,     0.04040404,     6.05210421,     0.58585859,     0.46464646,         8.75751503,         0.16161616,             1.24248497,         1.61616162,                 1.56312625,         5.85858586,                 0.01400841, 1.0,            2.4137931,      0.38076152, 2.5,    1.99679872      ])
    LBOpt = np.array([         0.0,        0.0,    0.0,        0.0,        0.0,            0.0,            0.0,            0.0,            0.0,            0.0,            0.0,                0.0,                0.0,                    0.0,                0.0,                        0.0,                0.0,                        0.0,        0.0,            0.0,            0.0,        0.0,    0.0,            ])
    UBOpt = np.array([         10.0,       10.0,   10.0,       10.0,       10.0,           10.0,           10.0,           10.0,           10.0,           10.0,           10.0,               10.0,               10.0,                   10.0,               10.0,                       10.0,               10.0,                       10.0,       10.0,           10.0,           10.0,       10.0,   10.0,           ])
    initialStdsOpt = np.array([2.0,        2.0,    2.0,        2.0,        2.0,            2.0,            2.0,            2.0,            2.0,            2.0,            2.0,                2.0,                2.0,                    2.0,                2.0,                        2.0,                2.0,                        2.0,        2.0,            2.0,            2.0,        2.0,    2.0,            ])
    minStdsOpt = np.array([    0.030,      0.40,   0.030,      0.40,       0.020,          0.020,          0.020,          0.020,          0.020,          0.020,          0.020,              0.020,              0.020,                  0.020,              0.020,                      0.020,              0.020,                      0.020,      0.050,          0.050,          0.020,      0.40,   0.020,          ]) 

    options = {'bounds':[LBOpt,UBOpt], 'CMA_stds':initialStdsOpt, 'minstd':minStdsOpt, 'popsize':numCores}
    es = cma.CMAEvolutionStrategy(P0, 1, options)

    pool = mp.Pool(numCores)

    partial_function_to_optmize = functools.partial(myFuncs.func1, input_A=input_A_Opt, input_B=input_B_Opt)

    while not es.stop():
        solutions = es.ask(es.popsize)            
        f_values = pool.map_async(partial_function_to_optmize, solutions).get()   
        es.tell(solutions, f_values)
        es.disp(1)
        es.logger.add()

    return es.result_pretty()

有什么建议可以解决这个问题吗?我是不是编码不当(对Python很新)或者应该使用其他多进程包,比如Scoop?


你使用了太多的内存!查阅共享内存(https://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes)以了解哪些值不需要被复制。 - tcooc
1个回答

2

您的对象过大,无法在进程之间传递。您正在传递超过2147483647字节的数据 - 超过了2GB!协议并不适用于此,并且序列化和反序列化如此大块的数据的纯粹开销可能会导致严重的性能开销。

减少传递给每个进程的数据大小。如果您的工作流允许,请在单独的进程中读取数据,并仅传递结果。


谢谢MisterMiyagi。是的,我传递的每个矩阵都超过2GB。虽然需要这些数据。我会尝试您建议的在每个进程中读取数据的方法,实际上是可行的。这将增加每个迭代所需的时间,因为我需要在每次评估时读取数据(成千上万次),而不是在进程开始时只读取一次。可能增加的时间约为25%,还算可以接受。 - Joe
顺便问一下,你知道还有没有其他能够处理这么大量数据的并行处理框架(例如scoop)吗? - Joe
跟进问题 - 同时有20个并行进程读取同一个文件会有问题吗? - Joe
@Joe,I/O主要由你正在读取的媒体所决定。现代消费级硬盘只适合两个并行读者。SSD可以支持20个或更多并发读者。RAID镜像实际上可以与设备数量线性扩展。如果你处理的数据量在TB以上,分布式并行文件系统是最好的选择。 - MisterMiyagi
谢谢你的回答。我实现了你的建议。我发现了其他问题。请看这里的帖子,如果你有任何建议,请告诉我。 - Joe
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接