Python中的lazy processpoolexecutor是什么?

7

我有许多任务需要执行,并希望通过生成器使结果可用。但是,使用ProcessPoolExecutoras_completed将贪婪地评估结果并将它们全部存储在内存中。是否有一种方法可以在存储了一定数量的结果后阻塞生成器?

2个回答

7

这个想法是将您要处理的内容分成块,我将使用与ProcessPoolExecutor文档中几乎相同的示例:

import concurrent.futures
import math
import itertools as it

PRIMES = [
    293,
    171,
    293,
    773,
    99,
    5419,
    293,
    171,
    293,
    773,
    99,
    5419,
    293,
    171,
    293,
    773,
    99,
    5419]


def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

def main_lazy():
    chunks = map(lambda x: it.islice(PRIMES, x, x+4), range(0, len(PRIMES), 4))
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = zip(PRIMES, 
                      it.chain.from_iterable(map(lambda x: executor.map(is_prime, x), 
                                                 chunks)))
        for number, prime in (next(results) for _ in range(4)):
            print('%d is prime: %s' % (number, prime))

if __name__ == "__main__":
    main_lazy()

请注意mainmain_lazy之间的区别,让我们解释一下:
我将要处理的所有内容分成了大小为4的块(使用itertools.islice非常有用),这样做的想法是,我们不会将整个列表映射到执行程序中,而是将每个块映射到执行程序中。然后只需使用Python3的lazy map,我们就可以将该执行程序调用懒惰地映射到每个块。因此,我们知道executor.map不是lazy的,因此当我们请求它时,该块将立即被计算,但在我们不请求其他块之前,该块的executor.map不会被调用。 正如您所看到的,我只请求了整个结果列表中的前4个元素,但由于我还使用了itertools.chain,它只会消耗第一个块中的元素,而不计算可迭代对象的其余部分。
因此,由于您希望返回生成器,所以只需从main_lazy函数返回结果即可,甚至可以抽象出块大小(可能需要一个好的函数来获取适当的块,但这超出了范围)。
def main_lazy(chunk_size):
    chunks = map(lambda x: it.islice(PRIMES, x, x+chunk_size), range(0, len(PRIMES), chunk_size))
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = zip(PRIMES, 
                      it.chain.from_iterable(map(lambda x: executor.map(is_prime, x), 
                                                 chunks)))
        return results

2
这里的问题(如果我说错了,请纠正我)是并行进程的使用不够优化。只有在前一个块的所有进程都完成时,您才会开始处理下一个块,因此可能会有很多空闲时间。 - Rafael Martins
我不确定我是否理解你的意思,@RafaelMartins,但在这种情况下,每个进程都在“同时”计算每个块。或者至少您可以为池中的每个进程计算一个块。 - Netwave
可能因为所有的zip和map使自己感到困惑,但是第一次调用executor.map(is_prime, x)将立即读取整个第一个块并将每个质数分配给处理器(共进行4个并发执行)。然后,在请求更多结果之前返回这4个结果中的每一个,触发第二个块的executor.map等等。问题是:第二个块只有在第一个块完全完成后才会被处理,所以在它们之间可能会有很多空闲时间。理想的替代方案是在每个质数完成后立即启动一个新的进程。 - Rafael Martins
@RafaelMartins,是的,那将是理想情况。但在这种情况下,问题是要避免过多的内存转储。因此,你只需要计算该时段所需的块(如我所记得的)。 - Netwave

0

我写了一个小的代码片段,可以实现所需功能,而不会因使用批处理而影响性能。

使用方法如下:


def work(inp: In) -> Out: ...

with ProcessPoolExecutor() as ex:
  # also works with ThreadPoolExecutor

  for out in lazy_executor_map(work_fn, inputs_iterable, ex):
      ...

而且实现本身:

from concurrent.futures import Executor, Future, wait, FIRST_COMPLETED
from typing import Callable, Iterable, Iterator, TypeVar
from typing_extensions import TypeVar

In = TypeVar("In")
Out = TypeVar("Out")

def lazy_executor_map(
    fn: Callable[[In], Out],
    it: Iterable[In],
    ex: Executor,
    # may want this to be equal to the n_threads/n_processes 
    n_concurrent: int = 6
) -> Iterator[Out]:
    
    queue: list[Future[Out]] = []
    in_progress: set[Future[Out]] = set()
    itr = iter(it)
    
    try:
        while True:

            for _ in range(n_concurrent - len(in_progress)):
                el = next(itr) # this line will raise StopIteration when finished
                # - which will get caught by the try: except: below
                fut = ex.submit(fn, el)
                queue.append(fut)
                in_progress.add(fut)

            _, in_progress = wait(in_progress, return_when=FIRST_COMPLETED)
            
            # iterate over the queue, yielding outputs if available in the order they came in with
            while queue and queue[0].done():
                yield queue.pop(0).result()

    except StopIteration:
        wait(queue)
        for fut in queue:
            yield fut.result()

还没有与批处理版本进行比较,但它似乎表现良好。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接