如何在Python的多进程池中使用值

9
我希望能够使用multiprocessing库中的Values模块来跟踪数据。据我所知,当涉及到Python中的多进程操作时,每个进程都有自己的副本,因此我不能编辑全局变量。我想使用Values来解决这个问题。有谁知道我如何将Values数据传递到池化函数中?
from multiprocessing import Pool, Value
import itertools

arr = [2,6,8,7,4,2,5,6,2,4,7,8,5,2,7,4,2,5,6,2,4,7,8,5,2,9,3,2,0,1,5,7,2,8,9,3,2,]

def hello(g, data):
    data.value += 1

if __name__ == '__main__':
    data = Value('i', 0)
    func = partial(hello, data)
    p = Pool(processes=1)
    p.map(hello,itertools.izip(arr,itertools.repeat(data)))

    print data.value

这里是我遇到的运行时错误:

RuntimeError: Synchronized objects should only be shared between processes through inheritance

有人知道我做错了什么吗?


我认为你需要将data变量传递到所有的进程中。 - Tom Dalton
@TomDalton 我刚刚更新了代码,使用itertools将数据变量传递到hello函数中,但现在出现了错误,我不确定为什么会发生这种情况。 - user2313602
你为什么不从 hello() 返回数据呢?这正是 map 的全部意义所在。 - Roland Smith
2个回答

10

我不知道为什么,但是使用Pool时似乎存在一些问题,而手动创建子进程则没有这个问题。例如,以下内容可以正常工作:

from multiprocessing import Process, Value

arr = [1,2,3,4,5,6,7,8,9]


def hello(data, g):
    with data.get_lock():
        data.value += 1
    print id(data), g, data.value

if __name__ == '__main__':
    data = Value('i')
    print id(data)

    processes =  []
    for n in arr:
        p = Process(target=hello, args=(data, n))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print "sub process tasks completed"
    print data.value

然而,如果您使用Pool基本上做相同的事情,那么您会得到一个错误信息“RuntimeError: Synchronized objects should only be shared between processes through inheritance”。我以前在使用池时看到过这个错误,并且从未完全搞清楚它的原因。

与使用Value不同的一种方法是使用管理器来提供一个“共享”的列表,这种方法似乎可以与Pool一起使用:

from multiprocessing import Pool, Manager
from functools import partial


arr = [1,2,3,4,5,6,7,8,9]


def hello(data, g):
    data[0] += 1


if __name__ == '__main__':
    m = Manager()
    data = m.list([0])
    hello_data = partial(hello, data)
    p = Pool(processes=5)
    p.map(hello_data, arr)

    print data[0]

太好了!这正是我在寻找的!看来我最终得使用Manager了!非常感谢你,汤姆! - user2313602
关于共享值,这个答案(https://dev59.com/tGHVa4cB1Zd3GeqPrey5#9931389)似乎通过池初始化器进行了一些解决方法,但它似乎是一个相当糟糕的解决方案。请注意,使用管理器会导致潜在的IPC速度较慢,与“真正”的共享内存相比。 - Tom Dalton

-1

使用 Pool.map() 时很少需要使用 Values

map 的核心思想是将函数应用于列表或其他迭代器中的每个项目,将返回值收集到列表中。

Pool.map 的思想基本相同,但分布在多个进程中。在每个工作进程中,映射的函数会使用迭代器中的项目进行调用。 从工作进程中调用的函数的 返回值 被传输回父进程并收集到列表中,最终返回。


另外,您可以使用 Pool.imap_unordered ,它会在结果可用时立即开始返回结果,而不是等到所有操作都完成后再返回结果。因此,您可以统计已返回的结果数量,并使用该数量更新进度条。


3
如果我想要一个进度条,例如,我需要一个计数器,所有的工作进程都能够共同增加。 - user297171
@BarafuAlbino 你可以使用 multiprocessing.Value 来实现。请注意(来自链接文档),在增加值之前,你必须先获取锁! - Roland Smith
3
可以。但是,正如问题所述,Value无法与multiprocessing.Pool.map一起使用。 - user297171
@BarafuAlbino 你应该在 if __name__ == "__main__" 之前创建 Value,这样子进程才能继承它。而且这可能会依赖于操作系统。MS-Windows 比较奇怪的是它没有 fork,这使得在类 UNIX 系统上实现 multiprocessing 更容易。 - Roland Smith
@BarafuAlbino 但是imap_unordered可能更简单。 - Roland Smith
这个答案太荒谬了。当然有理由这样做。首先,我需要跟踪项目之间的状态,因为不同的工作人员来回修改事物。所以是的,这很有帮助。我讨厌那些说"你不需要知道这个"的回答,哈哈。请删除。 - user3413723

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接