在tee'd生成器上进行多进程处理

5
考虑以下脚本,我测试了使用itertools.tee得到的生成器进行计算的两种方法:
#!/usr/bin/env python3

from sys import argv
from itertools import tee
from multiprocessing import Process

def my_generator():
    for i in range(5):
        print(i)
        yield i

def double(x):
    return 2 * x

def compute_double_sum(iterable):
    s = sum(map(double, iterable))
    print(s)

def square(x):
    return x * x

def compute_square_sum(iterable):
    s = sum(map(square, iterable))
    print(s)

g1, g2 = tee(my_generator(), 2)

try:
    processing_type = argv[1]
except IndexError:
    processing_type = "no_multi"

if processing_type == "multi":
    p1 = Process(target=compute_double_sum, args=(g1,))
    p2 = Process(target=compute_square_sum, args=(g2,))
    print("p1 starts")
    p1.start()
    print("p2 starts")
    p2.start()
    p1.join()
    print("p1 finished")
    p2.join()
    print("p2 finished")
else:
    compute_double_sum(g1)
    compute_square_sum(g2)

当我以“正常”模式运行脚本时,这是我得到的结果:

$ ./test_tee.py 
0
1
2
3
4
20
30

这里是并行模式:

$ ./test_tee.py multi
p1 starts
p2 starts
0
1
2
3
4
20
0
1
2
3
4
30
p1 finished
p2 finished

初始生成器似乎被“复制”并执行了两次。

我希望避免这种情况,因为在我的实际应用中,这似乎会导致我使用的一个外部库在初始生成器上出现错误 (https://github.com/pysam-developers/pysam/issues/397),同时仍能够在相同生成的值上进行并行计算。

有没有办法实现我想要的效果?


3
这种情况是预料之中的,因为使用multiprocessing时会复制整个进程,所以每个进程都有自己的迭代器副本。至少在Linux / Unix上是这样。在Windows上,会崩溃,因为multiprocessing不使用forking。相反,您的代码将崩溃,并显示“TypeError:can't pickle generator objects”。 - PM 2Ring
1
为什么需要迭代器?你能在列表中计算结果吗?你应该瞄准“无共享”架构——多进程允许共享代码实际上是不幸的。你可以使用Pool.imap吗? - Antti Haapala -- Слава Україні
在我的实际应用案例中,我宁愿不计算整个列表,因为我的生成器可能会产生数千万或数亿个非常重要的项目。我担心会饱和内存。使用池似乎是一个潜在的解决方法。 - bli
2个回答

1
我在这里找到了一些替代方法:https://dev59.com/fYXca4cB1Zd3GeqPCgcY#26873783
在这种方法中,我们不再将生成器分流。我们只是复制它生成的项,并将它们提供给一个组合函数,在一个进程中对生成的项进行并行处理,但我们利用多进程使用Pool(这是否被称为map/reduce方法?)。
#!/usr/bin/env python3

from itertools import starmap
from multiprocessing import Pool
from functools import reduce
from operator import add

def my_generator():
    for i in range(5):
        print(i)
        yield i

def double(x):
    return 2 * x

def square(x):
    return x * x

def double_and_square(args_list):
    return (double(*args_list[0]), square(*args_list[1]))

def sum_tuples(tup1, tup2):
    return tuple(starmap(add, zip(tup1, tup2)))

with Pool(processes=5) as pool:
    results_generator = pool.imap_unordered(double_and_square, (((arg,), (arg,)) for arg in my_generator()))

    print(reduce(sum_tuples, results_generator))

这在玩具示例中有效。我现在必须想办法以类似的方式组织我的计算来处理真实应用案例。
我尝试使用高阶函数(`make_funcs_applier`)来推广这个概念,生成复合函数(`apply_funcs`),但我遇到了以下错误:
AttributeError: Can't pickle local object  'make_funcs_applier.<locals>.apply_funcs'

更加通用的尝试

基于评论中的建议,我尝试改进上述解决方案以使其更具可重用性:

#!/usr/bin/env python3
"""This script tries to work around some limitations of multiprocessing."""

from itertools import repeat, starmap
from multiprocessing import Pool
from functools import reduce
from operator import add

# Doesn't work because local functions can't be pickled:
# def make_tuple_func(funcs):
#     def tuple_func(args_list):
#         return tuple(func(args) for func, args in zip(funcs, args_list))
#     return tuple_func
#
# test_tuple_func = make_tuple_func((plus_one, double, square))

class FuncApplier(object):
    """This kind of object can be used to group functions and call them on a
    tuple of arguments."""
    __slots__ = ("funcs", )

    def __init__(self, funcs):
        self.funcs = funcs

    def __len__(self):
        return len(self.funcs)

    def __call__(self, args_list):
        return tuple(func(args) for func, args in zip(self.funcs, args_list))

    def fork_args(self, args_list):
        """Takes an arguments list and repeat them in a n-tuple."""
        return tuple(repeat(args_list, len(self)))


def sum_tuples(*tuples):
    """Element-wise sum of tuple items."""
    return tuple(starmap(add, zip(*tuples)))


# Can't define these functions in main:
# They wouldn't be pickleable.
def plus_one(x):
    return x + 1

def double(x):
    return 2 * x

def square(x):
    return x * x

def main():
    def my_generator():
        for i in range(5):
            print(i)
            yield i


    test_tuple_func = FuncApplier((plus_one, double, square))

    with Pool(processes=5) as pool:
        results_generator = pool.imap_unordered(
            test_tuple_func,
            (test_tuple_func.fork_args(args_list) for args_list in my_generator()))
        print("sum of x+1:\t%s\nsum of 2*x:\t%s\nsum of x*x:\t%s" % reduce(
            sum_tuples, results_generator))
    exit(0)

if __name__ == "__main__":
    exit(main())

测试一下:

$ ./test_fork.py 
0
1
2
3
4
sum of x+1: 15
sum of 2*x: 20
sum of x*x: 30

由于我经常在代码中定义本地函数,因此仍然存在一些令人讨厌的限制。


2
局部函数确实无法进行序列化;你可以通过使用一个具有__call__方法的顶层类的实例来解决这个问题。 - Antti Haapala -- Слава Україні
感谢您的建议。我已经相应地更新了答案。 - bli
我尝试将这种技术应用到我的实际应用案例中,除非我在cProfile下运行它,否则它似乎是有效的,但在这种情况下,它会在对脚本顶层定义的一个相当正常的函数进行pickle时失败:_pickle.PicklingError: Can't pickle <function count_annot at 0x7f4c2ae91048>: attribute lookup count_annot on main failed。 - bli

0

multiprocessing系统会将您的主模块导入到它启动的每个进程中。因此,模块代码在每个进程中都会被执行。

您可以通过使用始终建议的方法来避免这种情况。

if __name__ == '__main__':

在你的类和函数定义之后,主程序的代码只在启动进程中运行。这本应该只是 Windows 平台的要求,但如果你抱怨代码被运行两次,这也值得一试。


我刚试了一下,但这并不能防止生成器被复制:我仍然看到其输出打印两次。 - bli
哦,好吧...抱歉。很高兴你似乎已经找到了解决这个问题的方法。 - holdenweb

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接