我有一个Python Pandas数据框的字典,这个字典的总大小约为2GB。但是,当我在16个多进程之间共享它时(在子进程中,我只读取字典的数据而不修改它),它占用32GB的内存。所以我想问一下,在不复制它的情况下,是否可能跨多处理共享这个字典。我尝试将其转换为manager.dict(),但似乎耗费时间太长。最标准的方法是什么?谢谢。
我有一个Python Pandas数据框的字典,这个字典的总大小约为2GB。但是,当我在16个多进程之间共享它时(在子进程中,我只读取字典的数据而不修改它),它占用32GB的内存。所以我想问一下,在不复制它的情况下,是否可能跨多处理共享这个字典。我尝试将其转换为manager.dict(),但似乎耗费时间太长。最标准的方法是什么?谢谢。
#!/usr/bin/python
from multiprocessing.managers import SyncManager
import numpy
# Global for storing the data to be served
gData = {}
# Proxy class to be shared with different processes
# Don't put big data in here since that will force it to be piped to the
# other process when instantiated there, instead just return a portion of
# the global data when requested.
class DataProxy(object):
def __init__(self):
pass
def getData(self, key, default=None):
global gData
return gData.get(key, None)
if __name__ == '__main__':
port = 5000
print 'Simulate loading some data'
for i in xrange(1000):
gData[i] = numpy.random.rand(1000)
# Start the server on address(host,port)
print 'Serving data. Press <ctrl>-c to stop.'
class myManager(SyncManager): pass
myManager.register('DataProxy', DataProxy)
mgr = myManager(address=('', port), authkey='DataProxy01')
server = mgr.get_server()
server.serve_forever()
运行上述代码一次并让它保持运行状态。下面是客户端类,您可以使用该类来访问数据。
DataClient.py
from multiprocessing.managers import BaseManager
import psutil #3rd party module for process info (not strictly required)
# Grab the shared proxy class. All methods in that class will be availble here
class DataClient(object):
def __init__(self, port):
assert self._checkForProcess('DataServer.py'), 'Must have DataServer running'
class myManager(BaseManager): pass
myManager.register('DataProxy')
self.mgr = myManager(address=('localhost', port), authkey='DataProxy01')
self.mgr.connect()
self.proxy = self.mgr.DataProxy()
# Verify the server is running (not required)
@staticmethod
def _checkForProcess(name):
for proc in psutil.process_iter():
if proc.name() == name:
return True
return False
#!/usr/bin/python
import time
import multiprocessing as mp
import numpy
from DataClient import *
# Confusing, but the "proxy" will be global to each subprocess,
# it's not shared across all processes.
gProxy = None
gMode = None
gDummy = None
def init(port, mode):
global gProxy, gMode, gDummy
gProxy = DataClient(port).proxy
gMode = mode
gDummy = numpy.random.rand(1000) # Same as the dummy in the server
#print 'Init proxy ', id(gProxy), 'in ', mp.current_process()
def worker(key):
global gProxy, gMode, gDummy
if 0 == gMode: # get from proxy
array = gProxy.getData(key)
elif 1 == gMode: # bypass retrieve to test difference
array = gDummy
else: assert 0, 'unknown mode: %s' % gMode
for i in range(1000):
x = sum(array)
return x
if __name__ == '__main__':
port = 5000
maxkey = 1000
numpts = 100
for mode in [1, 0]:
for nprocs in [16, 1]:
if 0==mode: print 'Using client/server and %d processes' % nprocs
if 1==mode: print 'Using local data and %d processes' % nprocs
keys = [numpy.random.randint(0,maxkey) for k in xrange(numpts)]
pool = mp.Pool(nprocs, initializer=init, initargs=(port,mode))
start = time.time()
ret_data = pool.map(worker, keys, chunksize=1)
print ' took %4.3f seconds' % (time.time()-start)
pool.close()
Using local data and 16 processes
took 0.695 seconds
Using local data and 1 processes
took 5.849 seconds
Using client/server and 16 processes
took 0.811 seconds
Using client/server and 1 processes
took 5.956 seconds
x=sum(array)
循环中的迭代次数降低,就可以看到这一点。在某个时候,你会花费更多的时间来获取数据而不是处理它。