使用多进程的共享内存字符串数组

3
我正在尝试用多进程处理一些现有的代码,并发现使用 Pool 进行数据的序列化/反序列化太慢了。我认为对于我的情况,使用 Manager 也会遇到类似的问题,因为它在幕后执行同样的序列化操作。
为了解决这个问题,我尝试切换到共享内存数组。为了让它正常工作,我需要一个字符串数组。似乎 multiprocessing.Array 支持 ctypes.c_char_p,但我很难将其扩展为字符串数组。下面是我尝试过的一些方法。
#!/usr/bin/python
import ctypes
import multiprocessing as mp
import multiprocessing.sharedctypes as mpsc
import numpy

# Tested possible solutions
ver = 1
if 1==ver:
    strings = mpsc.RawArray(ctypes.c_char_p, (' '*10, ' '*10, ' '*10, ' '*10))
elif 2==ver:
    tmp_strings = [mpsc.RawValue(ctypes.c_char_p, ' '*10) for i in xrange(4)]
    strings = mpsc.RawArray(ctypes.c_char_p, tmp_strings)
elif 3==ver:
    strings = []
    for i in xrange(4):
        strings.append( mpsc.RawValue(ctypes.c_char_p, 10) )

def worker(args):
    snum, lenarg = args
    string = '%s' % snum
    string *= lenarg
    strings[snum] = string
    return string

# Main progam
data = [(i, numpy.random.randint(1,10)) for i in xrange(3)]
print 'Testing version ', ver
print
print 'Single process'
for x in map(worker, data):
    print '%10s : %s' % (x, list(strings))
print

print 'Multi-process'
pool = mp.Pool(3)
for x in pool.map(worker, data):
    print '%10s : %s' % (x, list(strings))
    print '            ', [isinstance(s, str) for s in strings]

请注意,我正在使用multiprocessing.sharedctypes,因为我不需要锁定,而且它应该与multiprocessing.Array相当可互换。
上述代码的问题在于生成的strings对象包含常规字符串,而不是来自mpsc.RawArray构造函数的共享内存字符串。通过版本1和2,您可以看到在进程外部工作时数据如何被混淆(如预期)。对我而言,版本3一开始看起来好像行得通,但您可以看到等号只是将对象设置为普通字符串,虽然这对于短测试有效,但在较大的程序中会创建问题。
似乎应该有一种方法来创建一个指针的共享数组,其中指针指向共享内存空间中的字符串。如果您尝试使用c_str_p类型初始化c_void_p类型,则会收到投诉,而且我还没有成功地直接操作底层地址指针。
任何帮助将不胜感激。
1个回答

3

首先,你的第三种解决方案不起作用,因为 strings 没有被多进程部分修改,而是已经被单进程部分修改了。您可以通过注释掉单进程部分来进行检查。

其次,以下方法可以起作用:

import ctypes
import multiprocessing as mp
import multiprocessing.sharedctypes as mpsc
import numpy

strings = [mpsc.RawArray(ctypes.c_char, 10) for _ in xrange(4)]

def worker(args):
    snum, lenarg = args
    string = '%s' % snum
    string *= lenarg
    strings[snum].value = string
    return string

# Main progam
data = [(i, numpy.random.randint(1,10)) for i in xrange(4)]

print 'Multi-process'
print "Before: %s" % [item.value for item in strings]
pool = mp.Pool(4)
pool.map(worker, data)
print 'After : %s' % [item.value for item in strings]

输出:

Multi-process
Before: ['', '', '', '']
After : ['0000000', '111111', '222', '3333']

谢谢。应该自己发现这个问题。肯定比将数据进行 pickle 处理运行得更快。 - bivouac0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接