将functools.lru_cache与multiprocessing.Pool结合使用

24

我有一个涉及多个参数的复杂递归函数(如果有人想知道,它是Obara-Saika-Scheme),我希望能够更高效地对其进行评估。

作为第一步,我应用了@functools.lru_cache。作为第二步,我现在想使用multiprocessing.Pool来异步地评估一个长列表的输入参数。

根据 functools Python文档中的第二个示例并添加一个工作进程池,我现在有:

from multiprocessing import Pool
from functools import lru_cache

@lru_cache(maxsize=10)
def fibonacci(n):
    print('calculating fibonacci(%i)' %n)
    if n < 2:
        return n
    return fibonacci(n-1)+fibonacci(n-2)

with Pool(processes=4) as pool:
    for i in range(10):
        res = pool.apply_async(fibonacci, (i,))
        print(res.get())

print(fibonacci.cache_info())

问题1:
如何使缓存在不同的工作进程之间共享。另一个问题(如何共享缓存?)也在问类似的事情,但我无法使其工作。这里是我的两个失败尝试。
使用:
from multiprocessing import Pool
from functools import lru_cache
import time

@lru_cache(maxsize=10)
def fibonacci(n):
    print('calculating fibonacci(%i)' %n)   # log whether the function gets called
    if n < 2:
        return n
    return fibonacci(n-1)+fibonacci(n-2)

res = []
with Pool(processes=4) as pool:

    # submit first task
    res.append(pool.apply_async(fibonacci, (5,)).get())

    # give fibonacci() some time to fill its cache
    time.sleep(1)

    # submit second task
    res.append(pool.apply_async(fibonacci, (3,)).get())

print(res)

使用concurrent.futures:
import concurrent.futures
from functools import lru_cache

import time

@lru_cache(maxsize=10)
def fibonacci(n):
    print('calculating fibonacci(%i)' %n)   # log whether the function gets called
    if n < 2:
        return n
    return fibonacci(n-1)+fibonacci(n-2)

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:

    @lru_cache(maxsize=10)
    def fib_async(n):
        print('calculating fib_async(%i)' %n)
        if n < 2:
            return n
        return fibonacci(n-1) + fibonacci(n-2)

    res = []

    # submit first task
    res.append(executor.submit(fib_async, 5))

    # give fib_async() some time to fill its cache
    time.sleep(1)

    # submit second task
    res.append(executor.submit(fib_async, 3))


res = [e.result() for e in res]

print(res)

两种方法基本产生相同的输出,表明第二个任务重新计算了fibonacci(2),尽管第一个任务已经计算过了。如何让缓存共享?
这应该会加快速度,但如果重复调用时间不好,仍然存在问题:当前由worker1评估的调用尚未被缓存,worker2可能开始评估相同的内容。这就带来了一个问题:
问题2
计算斐波那契数列在其递归中是相当线性的,即只有一个参数被减少。我的函数更复杂,我需要管理已经计算过的输入参数,但也要跟踪当前正在计算的内容。
明确一下:我想对递归函数进行多个并行调用,这将产生许多新的递归函数调用。
一个棘手的问题可能是避免直接将一个调用分配给一个worker,因为当递归深度超过worker数量时,这将导致死锁。
是否已经有这样的东西可以使用?还是我需要自己构建一些东西?我偶然发现了multiprocessing.managersconcurrent.futures.ProcessPoolExecutor,它们可能会有所帮助。但我需要一些帮助才能开始。
1个回答

9

由于您需要的功能是CPU密集型的,选择使用multiprocessing 是正确的选择。

@lru_cache函数使用内存缓存。每个Python进程都包含自己的内存块,因此您将生成2个独立的缓存(存在不同的内存空间)。

如果您想要同步这些缓存,您需要使用某种内存同步机制,例如锁等。默认的lru_cache方法不支持多进程,但是您可以很容易地自己实现一个。

只需使用共享字典(这里有一个很好的例子)来保存缓存项,并使用锁封装对该字典的访问(有关参考,请查看 Python Wiki页面)。这样,您就可以在进程之间共享字典,同时保持访问安全。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接