将共享内存数组与Pool.map结合使用在Python并发编程中

69

我有一个非常大的(只读)数据数组,希望能够并行地由多个进程处理。

我喜欢Pool.map函数,并希望使用它来并行计算数据上的函数。

我发现可以使用ValueArray类在进程之间共享内存数据。但是当我尝试使用这些类时,在使用Pool.map函数时会出现RuntimeError:'SynchronizedString objects should only be shared between processes through inheritance'

以下是我正在尝试做的简化示例:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

有人能告诉我这里我做错了什么吗?

我想要做的是,在进程池中创建进程后,将关于新创建的共享内存分配数组的信息传递给这些进程。


1
很遗憾,这是不可能的。根据mp文档的建议,在fork平台上使用继承是推荐的方法。对于只读数据,通常会使用全局变量,但可以使用共享数组进行读/写通信。分叉是便宜的,所以每当您接收数据时,可以重新创建池,然后在之后关闭它。不幸的是,在Windows上这是不可能的 - 解决方法是使用共享内存数组(即使在只读情况下也是如此),但这只能在进程创建时传递给子进程(我想他们需要被添加到访问列表中... - robince
对于共享内存段以及这个逻辑在子进程启动时没有实现(除了在子进程启动时)。您可以像我展示的那样,在池启动时传递共享数据数组,或者以类似的方式将其传递给进程。您不能将共享内存数组传递给打开的池 - 您必须在内存之后创建池。解决这个问题的简单方法包括分配最大大小缓冲区,或者在启动池之前知道所需大小时仅分配数组。如果您将全局变量保持在较低水平,则在Windows上使用池也不会太昂贵 - 全局变量会自动... - robince
将数据进行序列化并发送到子进程,这就是为什么我建议在开始时制作一个足够大的缓冲区(希望您的全局变量数量较少),然后使用Pool更好的原因。在您编辑问题之前,我花时间诚心诚意地理解和解决了您的问题,因此,虽然我理解您想让它运行,但最终如果没有出现实质性不同/更好的答案,希望您考虑接受我的答案。 - robince
我仔细查看了源代码,发现关于共享内存的信息可以被pickled(需要将其信息传递到Windows上的客户端进程),但该代码有一个assert只能在进程生成期间运行。我想知道这是为什么。 - Jeroen Dirks
4个回答

61

我看到赏金,因此再次尝试回答问题 ;)

基本上,我认为错误信息的意思就是它所说的 - multiprocessing共享内存数组不能作为参数传递(通过pickle),序列化数据没有任何意义 - 重点在于数据是共享内存,因此您必须将共享数组作为全局变量。我认为将其作为模块属性更加整洁,就像我的第一个答案中那样,但是仅将其保留为全局变量也可以很好地工作。考虑到您不希望在fork之前设置数据这一点,这是一个修改后的示例。如果您想要有多个可能的共享数组(这就是为什么您想要将toShare作为参数传递),您可以同样制作一个共享数组的全局列表,并将索引传递给count_it(它将成为for c in toShare [i]:)。

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

[编辑:由于未使用fork,上述方法在Windows上无法运行。但是,下面的方法仍然使用Pool可以在Windows上运行,因此我认为这是最接近您想要的方法:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

不确定为什么地图不会对数组进行Pickling,但是进程和池会 - 我认为可能是在Windows上的子进程初始化时传输。请注意,数据在分叉之后仍然被设置。


即使在具有fork功能的平台上,也不能在fork之后将新的共享数据插入到toShare中,因为此时每个进程都将拥有自己独立的副本。 - Jeroen Dirks
所以真正的问题似乎在于我们如何将关于数组的信息进行序列化,以便可以将其发送并连接到其他进程。 - Jeroen Dirks
@James - 不对。数组必须在分叉之前设置,但是它是共享内存,可以更改,并且所有子进程都可以看到更改。看看这个例子 - 我将数据放入数组中分叉之后(当Pool()被实例化时发生)。该数据可以在运行时获取,在分叉之后,只要它适合预先分配的共享内存段,就可以将其复制到那里并从所有子进程中看到。 - robince
你已经接近成功了,但是仍然存在一个问题,即在创建进程之前必须先确定 toShare 数组的长度。因此,在创建进程之前,你仍然在创建共享内存段。我真正想看到的一般解决方案是,在创建池后创建一个新的可变长度共享数组的方法,将有关它的信息传递给工作进程,并从中读取。 - Jeroen Dirks
无论如何,这似乎是一个人为的要求。如果新的数据集大小与当前共享缓冲区的大小不匹配 - 您可以只需关闭池(pool.close()),创建所需大小的新共享数组并打开新池。对于任何值得使用多进程的计算任务,关闭和打开池的开销都将非常小。而且池操作相对原子化 - 因此不像您可以在映射命令的中间注入新鲜数据。 - robince
显示剩余4条评论

9
如果你看到以下错误信息:
运行时错误:同步对象只应通过继承在进程之间共享
考虑使用 multiprocessing.Manager,因为它没有这个限制。管理器的工作原理是它可能在完全不同的进程中运行。
import ctypes
import multiprocessing

# Put this in a method or function, otherwise it will run on import from each module:
manager = multiprocessing.Manager()
counter = manager.Value(ctypes.c_ulonglong, 0)
counter_lock = manager.Lock()  # pylint: disable=no-member

with counter_lock:
    counter.value = count = counter.value + 1

这是我在使用multiprocessing.Pool时实际上得到的唯一建议,而且我不需要显式处理manager.Lock - raphael
@raphael,你是在断言Value有一个隐式锁吗?显式锁的存在是为了防止竞态条件,从而防止在多个进程更新计数时出现错误的计数。 - Asclepius

7
如果数据是只读的,那么在从池中分叉之前,将其作为变量放入模块中。然后所有子进程都应该能够访问它,并且只要不写入它,它就不会被复制。
import myglobals # anything (empty .py file)
myglobals.data = []

def count_it( key ):
    count = 0
    for c in myglobals.data:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )

如果您确实想尝试使用Array,您可以尝试使用lock=False关键字参数(默认为true)。


我不相信使用全局变量是安全的,而且在Windows上也肯定行不通,因为进程没有被分叉。 - Jeroen Dirks
1
这为什么不安全呢?如果你只需要读取数据,那就没问题。如果你错误地对其进行了写入,则修改的页面将被复制到子进程上进行写时复制,因此不会造成任何损害(例如不会干扰其他进程)。不过,你说得对,在 Windows 上这行不通... - robince
你说得没错,在基于fork的平台上是安全的。但我想知道在进程池创建后是否有一种基于共享内存的方式来共享大量数据。 - Jeroen Dirks

7
我看到的问题是Pool不支持通过其参数列表对共享数据进行pickle。这就是错误消息所说的“对象只能通过继承在进程之间共享”。如果您要使用Pool类共享数据,则需要继承共享数据,即全局继承。
如果您需要显式地传递它们,可能需要使用multiprocessing.Process。以下是您重新编写的示例:
from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),

输出结果:('s', 9) ('a', 2) ('b', 3) ('d', 12)

队列中的元素顺序可能会有所不同。

为了让它更加通用并类似于池,您可以创建固定数量 N 的进程,将键列表分成 N 份,然后使用包装函数作为进程目标,它将调用 count_it 函数来处理传递给它的列表中的每个键,例如:

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接