多进程通过共享内存传递字典数组

6
以下代码可以工作,但由于传递大数据集而非常缓慢。在实际实现中,创建进程和发送数据所需的时间几乎与计算时间相同,因此到创建第二个进程时,第一个进程几乎已完成计算,从而使并行化变得毫无意义。
代码与此问题相同 Multiprocessing has cutoff at 992 integers being joined as result,建议的更改已经实现。然而,我遇到了通常发生的问题,即将大量数据进行pickle处理需要很长时间。
我看到有些答案使用multiprocessing.array来传递共享内存数组。我有一个大约有4000个索引的数组,但每个索引都有一个包含200个键/值对的字典。数据仅由每个进程读取,进行一些计算,然后返回一个矩阵(4000x3)(没有字典)。
像这样的答案Is shared readonly data copied to different processes for Python multiprocessing?使用map。是否可以保持以下系统并实现共享内存?是否有一种有效的方法将数据发送到每个带有字典数组的进程中,例如将字典包装在某个管理器中,然后将其放入multiprocessing.array 中?
import multiprocessing

def main():
    data = {}
    total = []
    for j in range(0,3000):
        total.append(data)
        for i in range(0,200):
            data[str(i)] = i

    CalcManager(total,start=0,end=3000)

def CalcManager(myData,start,end):
    print 'in calc manager'
    #Multi processing
    #Set the number of processes to use.  
    nprocs = 3
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    result_q = multiprocessing.Queue()
    #Setup an empty array to store our processes
    procs = []
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start
    #Create all the processes while dividing the work appropriately
    for i in range(nprocs):
        print 'starting processes'
        new_end = new_start + interval
        #Make sure we dont go past the size of the data 
        if new_end > end:
            new_end = end 
        #Generate a new process and pass it the arguments 
        data = myData[new_start:new_end]
        #Create the processes and pass the data and the result queue
        p = multiprocessing.Process(target=multiProcess,args=(data,new_start,new_end,result_q,i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'    

    #Print out the results
    for i in range(nprocs):
        result = result_q.get()
        print result

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
    print 'started process'
    results = []
    temp = []
    for i in range(0,22):
        results.append(temp)
        for j in range(0,3):
            temp.append(j)
    result_q.put(results)
    return

if __name__== '__main__':   
    main()

解决方法:

只需将字典列表放入管理器中,问题即可得到解决。

manager=Manager()
d=manager.list(myData)

看起来持有列表的经理也管理着该列表包含的字典。启动时间有点慢,因此似乎数据仍在被复制,但这只在一开始完成一次,然后在进程内部对数据进行切片。

import multiprocessing
import multiprocessing.sharedctypes as mt
from multiprocessing import Process, Lock, Manager
from ctypes import Structure, c_double

def main():
    data = {}
    total = []
    for j in range(0,3000):
        total.append(data)
        for i in range(0,100):
            data[str(i)] = i

    CalcManager(total,start=0,end=500)

def CalcManager(myData,start,end):
    print 'in calc manager'
    print type(myData[0])

    manager = Manager()
    d = manager.list(myData)

    #Multi processing
    #Set the number of processes to use.  
    nprocs = 3
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    result_q = multiprocessing.Queue()
    #Setup an empty array to store our processes
    procs = []
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start
    #Create all the processes while dividing the work appropriately
    for i in range(nprocs):
        new_end = new_start + interval
        #Make sure we dont go past the size of the data 
        if new_end > end:
            new_end = end 
        #Generate a new process and pass it the arguments 
        data = myData[new_start:new_end]
        #Create the processes and pass the data and the result queue
        p = multiprocessing.Process(target=multiProcess,args=(d,new_start,new_end,result_q,i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'    

    #Print out the results
    for i in range(nprocs):
        result = result_q.get()
        print len(result)

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
    #print 'started process'
    results = []
    temp = []
    data = data[start:end]
    for i in range(0,22):
        results.append(temp)
        for j in range(0,3):
            temp.append(j)
    print len(data)        
    result_q.put(results)
    return

if __name__ == '__main__':
    main()

要使用共享内存,您需要将字典数组转换为 ctypes 对象,然后使用 multiprocessing.sharedctypes。我不确定这是否适用于您的用例。 - dano
@HaiVu 它返回每个进程的浮点数数组。结果=范围(0,(992))只是示例列表。 - user-2147482637
所以,在你的例子中,你的数据有50个输入,但我只看到3个矩阵返回。你是想返回50个矩阵吗? - Hai Vu
@HaiVu 返回的3个矩阵是因为有3个进程。每个进程返回一个22x3的矩阵,即22组(x,y,z)值。这些值是由输入数据计算得出的,该数据使用字典来定义地点的名称和其值。这样说清楚了吗? - user-2147482637
1
@user1938107 不可以将 multiprocessing.Manager.dict 对象放入 multiprocessing.sharedctype 对象中,只能放置 ctypes 对象。你可以使用 Manager 创建一个共享的 list,并在其中放置共享的 dict 实例,但我不确定这是否真的会给你带来更好的性能优势。 - dano
显示剩余5条评论
2个回答

2

看到您的问题,我假设以下内容:

  • 对于myData中的每个项目,您想要返回一个输出(某种矩阵)
  • 您创建了一个JoinableQueue(tasks),可能是用于保存输入,但不确定如何使用它

代码

import logging
import multiprocessing


def create_logger(logger_name):
    ''' Create a logger that log to the console '''
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)

    # create console handler and set appropriate level
    ch = logging.StreamHandler()
    formatter = logging.Formatter("%(processName)s %(funcName)s() %(levelname)s: %(message)s")
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    return logger

def main():
    global logger
    logger = create_logger(__name__)
    logger.info('Main started')
    data = []
    for i in range(0,100):
        data.append({str(i):i})

    CalcManager(data,start=0,end=50)
    logger.info('Main ended')

def CalcManager(myData,start,end):
    logger.info('CalcManager started')
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Add tasks
    for i in range(start, end):
        tasks.put(myData[i])

    # Create processes to do work
    nprocs = 3
    for i in range(nprocs):
        logger.info('starting processes')
        p = multiprocessing.Process(target=worker,args=(tasks,results))
        p.daemon = True
        p.start()

    # Wait for tasks completion, i.e. tasks queue is empty
    try:
        tasks.join()
    except KeyboardInterrupt:
        logger.info('Cancel tasks')

    # Print out the results
    print 'RESULTS'
    while not results.empty():
        result = results.get()
        print result

    logger.info('CalManager ended')

def worker(tasks, results):
    while True:
        try:
            task = tasks.get()  # one row of input
            task['done'] = True # simular work being done
            results.put(task)   # Save the result to the output queue
        finally:
            # JoinableQueue: for every get(), we need a task_done()
            tasks.task_done()


if __name__== '__main__':   
    main()

讨论

  • 对于多个进程的情况,我建议使用logging模块,因为它具有以下几个优点:
    • 它是线程和进程安全的;这意味着您不会出现一个进程的输出与另一个进程混在一起的情况
    • 您可以配置日志记录来显示进程名称、函数名称——非常方便调试
  • CalcManager本质上是一个任务管理器,它执行以下操作
    1. 创建三个进程
    2. 填充输入队列tasks
    3. 等待任务完成
    4. 打印结果
  • 请注意,在创建进程时,我将它们标记为daemon,这意味着它们将在主程序退出时被终止。您不必担心杀死它们
  • worker是工作的地方
    • 每个进程都永远运行(while True循环)
    • 每次循环,它们将获取一个输入单元,进行一些处理,然后将结果放入输出中
    • 任务完成后,它调用task_done(),以便主进程知道何时所有作业都已完成。我将task_done放在finally子句中,以确保即使在处理过程中出现错误,它也将运行

谢谢你的回答,我想你可能还没有看到我的更新,所以一直在问有关返回值的问题。这会改变我们的策略吗? - user-2147482637
对于大数据,这似乎不起作用。当任务和join被交换并且数据被卡住时,它看起来有与我的原始代码相同的问题。...虽然我真的不太理解正在发生什么。 - user-2147482637

2

使用multiprocessing.Manager将列表存储在管理器服务器中可能会有所改善,每个子进程通过从共享列表中提取项目来访问字典的项,而不是将切片复制到每个子进程中:

def CalcManager(myData,start,end):
    print 'in calc manager'
    print type(myData[0])

    manager = Manager()
    d = manager.list(myData)

    nprocs = 3 
    result_q = multiprocessing.Queue()
    procs = []

    interval = (end-start)/nprocs 
    new_start = start

    for i in range(nprocs):
        new_end = new_start + interval
        if new_end > end:
            new_end = end 
        p = multiprocessing.Process(target=multiProcess,
                                    args=(d, new_start, new_end, result_q, i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'        

    for i in range(nprocs):
        result = result_q.get()
        print len(result)

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

在创建任何工作进程之前,将整个data列表复制到Manager进程中。 Manager返回一个Proxy对象,允许对list进行共享访问。 然后只需将Proxy传递给工作进程,这意味着它们的启动时间大大缩短,因为不再需要复制data列表的切片。 这里的缺点是,通过IPC访问列表时,子进程的访问速度会变慢。是否真正有助于性能取决于您在工作进程中对list进行了什么样的处理,但是值得一试,因为几乎不需要更改代码。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接