简单并行化尴尬可并行生成器的方法

3

我有一个生成器(或者,一组生成器)。让我们把它们称为gens

gens中的每个生成器都是一个复杂的函数,它返回复杂过程的下一个值。幸运的是,它们相互独立。

我想对gens中的每个元素gen调用gen.__next __(),并将结果值作为列表返回。但是,multiprocessing无法对生成器进行pickle处理。

在Python中,是否有一种快速简单的方法来实现这一点?我希望gens的长度为m,在我的本地机器上映射到n个核心,其中n可以大于或小于m。每个生成器应该在单独的核心上运行。

如果这是可能的,能否提供一个最小的示例?


它可能走在正确的轨道上,但我正在寻找一个最简解决方案。至少有几个因素——文件I/O和无法运行的示例——阻止它成为真正易于使用和消化的问答。 - user650261
but I am looking for a minimal solution, great but that would require a minimal reproducible example from your side as well :). Maybe give an example of the complicated function that returns the next value of a complicated procedure - Akshay Sehgal
你的复杂程序具体是什么?它们是否计算密集型(例如解决物理方程),进行大量的磁盘读写或网络操作? - morphheus
计算量大,但没有文件I/O或网络。 - user650261
@PaulBrennan 那个问题只使用生成器来产生输入;MP工作进程本身并不是基于生成器的。 - Martijn Pieters
显示剩余2条评论
4个回答

3
您无法对生成器进行pickle。在这里阅读更多相关信息。
有一篇博客文章详细解释了这个问题。引用其中的一句话:
“暂时忽略这个问题,我们来看看如何pickle一个生成器。由于生成器本质上是一个功能强大的函数,我们需要保存它的字节码,但不能保证在Python版本之间向后兼容,以及它的帧,它保存生成器的状态,例如局部变量、闭包和指令指针。而后者实现起来相当麻烦,因为它基本上要求使整个解释器可pickle。因此,任何支持pickle生成器的支持都需要对CPython核心进行大量更改。”
“现在,如果一个不受pickle支持的对象(例如文件句柄、套接字、数据库连接等)出现在生成器的局部变量中,那么该生成器无论我们是否实现了pickle支持,都无法自动pickle。因此,在这种情况下,您仍然需要提供自定义的getstate和setstate方法。这个问题使得对生成器的pickle支持相当有限。”
他还建议一种解决方法,使用简单的迭代器。
最好的解决方法是将生成器重写为简单迭代器(即,具有__next__方法的迭代器)。 迭代器在空间上易于处理和高效,因为它们的状态是显式的。 但是,您仍然需要明确地处理表示某些外部状态的对象; 您无法绕开这一点。
另一个提供的解决方案(我没有尝试过)建议如下:
1. 将生成器转换为类,在该类中,生成器代码是__iter__方法。 2. 向该类添加__getstate__和__setstate__方法,以处理pickling。请记住,您无法pickle文件对象。 因此,__setstate__将必须根据需要重新打开文件。

__iter__ 方法仍然不能是生成器。生成器是一种在(暂停的)函数中保持状态的迭代器。您可以通过实现 __next__ 来实现迭代器而不使用生成器。 - Martijn Pieters

0

如果您的子任务真正并行(不依赖于任何共享状态),则可以使用multiprocessing.Pool()实现此目的。

请参阅https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool

这要求您使pool.map()的参数可序列化。您无法将生成器传递给工作进程,但是您可以通过在目标函数内定义生成器并将初始化参数传递给多进程库来实现类似的功能:

import multiprocessing as mp
import time

def worker(value):
    # The generator is defined inside the multiprocessed function
    def gen():
        for k in range(value):
            time.sleep(1) # Simulate long running task
            yield k

    # Execute the generator
    for x in gen():
        print(x)
        # Do something with x?
        pass


pool = mp.Pool()
pool.map(worker, [2, 5, 2])

pool.join() # Wait for all the work to be finished.
pool.close() # Clean up system resources

输出结果将是:

0
0
0
1
1
1
2
3
4


请注意,此解决方案仅在您构建生成器后仅使用一次时才真正有效,因为它们的最终状态在工作函数结束时丢失。
请记住,每当您想要使用多进程时,由于进程间通信的限制,您必须使用可序列化对象;这通常会限制您的选择。
如果您的进程不是 CPU 绑定而是 I/O 绑定(磁盘访问、网络访问等),那么使用线程会更容易。

两个小提示:
  1. 你的代码中有两个错误。在 map 后面需要加上 pool.close(),然后最后只需要 pool.join()。
  2. 这部分代码部分可行,但是没有办法在调用之间继续使用生成器。每次执行时都会清除它离开的状态,这样就有点违背了使用生成器的初衷。
- user650261
  1. 关于连接和关闭的观点很好,我会更新答案。
  2. 我知道状态会丢失,这就是为什么我提到它只类似于传递生成器。这个解决方案适用于“一次性”生成器。我会添加另一个评论来澄清这一点。
- morphheus

0
你不需要将生成器pickle,只需向进程池发送生成器的索引即可。
M = len(gens)
N = multiprocessing.cpu_count()

def proc(gen_idx):
    return [r for r in gens[gen_idx]()]

if __name__ == "__main__":
    with multiprocessing.Pool(N) as p:
        for r in p.imap_unordered(proc, range(M)):
            print(r)

请注意,直到在处理函数内部才调用/初始化生成器。
使用imap_unordered将允许您在每个生成器完成时处理结果。

0

实现起来非常容易,只需避免同步阻塞线程,不断循环遍历各种状态并在完成时合并它们即可。这个模板应该足够好,self.done 总是需要在线程完成时最后设置,并作为线程重用的最后一个。

import threading as th
import random
import time

class Gen_thread(th.Thread):

    def is_done(self):
        return self.done

    def get_result(self):
        return self.work_result

    def __init__(self, *args, **kwargs):
        self.g_id = kwargs['id']
        self.kwargs = kwargs
        self.args = args
        self.work_result = None
        self.done = False
        th.Thread.__init__(self)

    def run(self):
        # time.sleep(*self.args) to pass variables
        time.sleep(random.randint(1, 4))
        self.work_result = 'Thread {0} done'.format(self.g_id + 1)
        self.done = True

class Gens(object):

    def __init__(self, n):
        self.n_needed = 0
        self.n_done = 0
        self.n_loop = n

        self.workers_tmp = None
        self.workers = []

    def __iter__(self):
        return self

    def __next__(self):
        if self.n_needed == 0:
            for w in range(self.n_loop):
                self.workers.append(Gen_thread(id=w))
                self.workers[w].start()
                self.n_needed += 1

        while self.n_done != self.n_needed:
            for w in range(self.n_loop):
                if self.workers[w].is_done():
                    self.workers[w].join()
                    self.workers_tmp = self.workers[w].get_result()
                    self.workers.pop(w)
                    self.n_loop -= 1
                    self.n_done += 1
                    return self.workers_tmp

        raise StopIteration()


if __name__ == '__main__':
    for gen in Gens(4):
        print(gen)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接