Python多进程池和参数序列化

3
请看下面的例子:
import multiprocessing as mp

def job(l):
    l.append(1)
    return l

if __name__ == "__main__":
    pool = mp.Pool(1)
    my_list = []
    out = pool.map(job, [my_list for i in range(5)])
    pool.close()
    pool.join()
    print(out)

调用pool.map时,我预期参数会被pickle化并在执行作业时进行反序列化(因此每次重新创建)。然而,观察到的输出是

[[1, 1], [1, 1], [1, 1], [1, 1], [1]]

能有人解释一下发生了什么吗?我期望输出的结果是一个长度为5的列表[1],或者是由[1]、[1, 1]、...、[1, 1, 1, 1, 1]组成的列表,但现实情况却并非如此。

2个回答

2
pool.map中的chunksize参数是导致您困惑的原因。显然,它会选择自动设置chunksize=2,因为即使您明确设置了chunksize=2,您也会得到观察到的输出。
如果使用chunksize=1,则会得到[[1], [1], [1], [1], [1]];如果使用chunksize=3,则会得到[[1, 1, 1], [1, 1, 1], [1, 1, 1], [1, 1], [1, 1]]
如果您在代码中添加打印语句,则可以观察发生了什么。
import multiprocessing as mp

def job(l):
    print(f'before append {l}')
    l.append(1)
    print(f'after append {l}')
    return l

if __name__ == "__main__":
    pool = mp.Pool(1)
    my_list = []
    out = pool.map(job, [my_list for _ in range(5)], chunksize=2)
    pool.close()
    pool.join()
    print(out)

这将为您提供以下输出:
before append []
after append [1]
before append [1]
after append [1, 1]
before append []
after append [1]
before append [1]
after append [1, 1]
before append []
after append [1]
[[1, 1], [1, 1], [1, 1], [1, 1], [1]]

Process finished with exit code 0

您会发现,“before append”仅在空列表中出现3次,而不是您预期的5次。这是因为使用chunksize=2和可迭代的5个项,你有5/2=2.5个任务。半个任务是不可能的,所以最终只有3个任务:2个包含两个项的任务和1个包含一个项的任务。
对于前两个任务,函数job的第一次执行获取未pickle的空列表并附加1。然后第二次执行获取刚刚修改过的同一列表,因为你的项只是此任务内相同列表的引用。第二次执行也更改了第一次执行的结果,因为两者都修改了同一基础对象。第二次执行完成后,任务也完成了,并将两次执行的结果[[1, 1],[1, 1]]发送回父级。正如我们所说的,这发生在前两个任务中。
第三个任务只有一次执行job,其结果不会被第二个执行修改,因此结果仅为[1]。
如果在代码末尾添加for obj in out: print(id(obj)),则会看到您获得了三个不同的id,表示处理可迭代的任务数(CPython)时产生了三个单独的列表。
140584841382600
140584841382600
140584841383432
140584841383432
140584841383368

0

这会因为进程数量的不同而产生不同的结果,这意味着你正在做一些不安全的进程操作;在这种情况下,可能会在多个进程中操作本地列表。

我不太清楚你想要实现什么,但至少这样行为是一致的:

from multiprocessing import Pool, Manager


def job(l):
    l.append(1)
    return l


if __name__ == "__main__":
    manager = Manager()

    for proc_count in range(1, 6):
        print(proc_count)
        pool = Pool(proc_count)
        my_list = manager.list()
        out = pool.map(job, [my_list for i in range(5)])
        pool.close()
        pool.join()
        print(list(list(o) for o in out))

如果您不想这样做,可以忽略管理器,删除my_list并使用[list() for i in range(5)]也会产生一致的,尽管不同的行为。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接