Python多进程和管理器

5
我正在使用Python的“multiprocessing”创建并行应用程序。进程需要共享一些数据,为此我使用一个“Manager”。然而,我有一些常见的函数,进程需要调用,并且需要访问由“Manager”对象存储的数据。我的问题是是否可以避免将“Manager”实例作为参数传递给这些常见函数,而是像全局变量一样使用它。换句话说,考虑以下代码:
import multiprocessing as mp

manager = mp.Manager()
global_dict = manager.dict(a=[0])

def add():
    global_dict['a'] += [global_dict['a'][-1]+1]

def foo_parallel(var):
    add()
    print var

num_processes = 5
p = []
for i in range(num_processes):
    p.append(mp.Process(target=foo_parallel,args=(global_dict,)))

[pi.start() for pi in p]
[pi.join() for pi in p]

这在我的机器上运行良好并返回p=[0,1,2,3,4,5]。然而,这样做是否“规范”?这是否是一种好方法,就像定义add(var)并调用add(var)一样好?
1个回答

5
您的代码示例似乎存在比表单更大的问题。您只有运气好才能获得所需的输出。重复执行会产生不同的结果。这是因为+=不是原子操作。多个进程可以依次读取相同的旧值,在它们中的任何一个更新它之前,并且它们将写回相同的值。为了防止这种行为,您需要额外使用Manager.Lock

关于“好的形式”的问题,我的看法是,让子进程的主函数foo_parallelglobal_dict显式地传递给通用函数add(var)会更加简洁。这是一种依赖注入的形式,具有某些优点。在您的示例中,列举了以下优点:

  • 允许隔离测试

  • 提高代码可重用性

  • 更易于调试(检测受管理对象的非访问性不应推迟到调用add时(快速失败

  • 减少样板代码(例如,多个函数需要资源的try-except块)

顺带一提,仅出于副作用而使用列表推导式被认为是一种“代码异味”。如果您不需要结果列表,只需使用for循环即可。

代码:

import os
from multiprocessing import Process, Manager


def add(l):
    l += [l[-1] + 1]
    return l


def foo_parallel(global_dict, lock):
    with lock:
        l = global_dict['a']
        global_dict['a'] = add(l)
        print(os.getpid(), global_dict)


if __name__ == '__main__':

    N_WORKERS = 5

    with Manager() as manager:

        lock = manager.Lock()
        global_dict = manager.dict(a=[0])

        pool = [Process(target=foo_parallel, args=(global_dict, lock))
                for _ in range(N_WORKERS)]

        for p in pool:
            p.start()

        for p in pool:
            p.join()

        print('result', global_dict)

亲爱的Darkonaut,关于我们对“原子操作(+=操作)”的评论,你的例子是否也适用于解决它?例如,在你的“add”函数中,我认为变量“l”可以更新如下:“l.append([l[-1] + 1]。我这样假设是错误的吗?诚挚地, - Philipe Riskalla Leal
似乎不能像我上面建议的那样做。我刚刚测试了一下,出现了“TypeError: can only concatenate list (not "int") to list”的错误。有人能解释为什么吗?不过,解决这个错误的方法似乎是“l = l + [l[-1] + 1]”。 - Philipe Riskalla Leal
亲爱的@PhilipeRiskallaLeal,您会收到此错误,因为在您已经进行附加操作时,当您编写l.append([l[-1] + 1])时,您还添加了另一个嵌套列表。它应该是l.append(l[-1] + 1),这确实比使用+=更好,但它仍然不是原子性的,因为它首先读取l[-1],然后才附加。我上面的解决方案通过使用锁来读取和更新来解决了进程间竞争条件,从而使整个操作在进程间层面上具有原子性。 - Darkonaut
我现在看到了我的错误。谢谢Darkonaut。 - Philipe Riskalla Leal

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接