如何将具有多个参数的函数传递给Python concurrent.futures.ProcessPoolExecutor.map()?

19
我希望concurrent.futures.ProcessPoolExecutor.map()能够调用由2个或更多参数组成的函数。在下面的示例中,我使用了一个lambda函数,并将ref定义为与numberlist大小相等且具有相同值的数组。 第一个问题:是否有更好的方法?当numberlist的大小可能为百万到十亿个元素时,因此ref的大小必须跟随numberlist,这种方法会不必要地占用宝贵的内存,我希望避免这种情况。我之所以这样做是因为我读到map函数将终止其映射,直到达到最短数组的末尾。
import concurrent.futures as cf

nmax = 10
numberlist = range(nmax)
ref = [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
workers = 3


def _findmatch(listnumber, ref):    
    print('def _findmatch(listnumber, ref):')
    x=''
    listnumber=str(listnumber)
    ref = str(ref)
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
    if ref in listnumber:
        x = listnumber
    print('x = {0}'.format(x))
    return x 

a = map(lambda x, y: _findmatch(x, y), numberlist, ref)
for n in a:
    print(n)
    if str(ref[0]) in n:
        print('match')

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    #for n in executor.map(_findmatch, numberlist):
    for n in executor.map(lambda x, y: _findmatch(x, ref), numberlist, ref):
        print(type(n))
        print(n)
        if str(ref[0]) in n:
            print('match')

运行上述代码后,我发现map函数能够实现我想要的结果。然而,当我将相同的术语转移到concurrent.futures.ProcessPoolExecutor.map()时,Python3.5出现了以下错误:

Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed
    obj = ForkingPickler.dumps(obj)
  File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7fd2a14db0d0>: attribute lookup <lambda> on __main__ failed

问题2:为什么会出现这个错误,如何让concurrent.futures.ProcessPoolExecutor.map()调用一个带有多个参数的函数?

3个回答

16

先回答你的第二个问题,你遇到异常是因为像你正在使用的那个lambda函数无法被序列化。由于Python使用pickle协议来序列化主进程和ProcessPoolExecutor的工作进程之间传递的数据,这是一个问题。不清楚你为什么要使用lambda。你原本使用的lambda函数接受了两个参数,就像原始函数一样。你可以直接使用_findmatch而不是lambda,它应该可以工作。

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(_findmatch, numberlist, ref):
        ...

关于如何在不创建大列表的情况下传递第二个常量参数的问题,你可以通过几种方式来解决。其中一种方法可能是使用itertools.repeat创建一个可迭代对象,当进行迭代时会无限重复同一个值。

但更好的方法可能是编写额外的函数来为你传递常量参数。(也许这就是你尝试使用lambda函数的原因?)如果你使用的函数可以在模块的顶层命名空间中访问,那么它应该能够正常工作:

def _helper(x):
    return _findmatch(x, 5)

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(_helper, numberlist):
        ...

1
你说得对,我尝试使用 lambda 是因为最初当 ref 是常量时,我无法将带有2个参数的函数传递到 executor 中。 将 ref 转换为与 numberlist 相同大小的列表后,我突然意识到我忘记删除 lambda。 我真正想要的解决方案是 ref 像常量或类似的东西一样。 所以你提到的辅助函数和 itertools.repeat 很管用。谢谢。 - Sun Bear
我想邀请您回答我的后续问题,在那里我对Executor.mapExecutor.submit的性能进行了基准测试,发现前者明显较慢,我想知道原因是什么? - Sun Bear

11

(1) 不需要制作列表。您可以使用 itertools.repeat 创建一个迭代器,它只重复某个值。

(2) 您需要将一个命名函数传递给 map,因为它将被传递给子进程进行执行。 map 使用 pickle 协议发送内容,而 lambda 表达式不能被序列化,因此它们不能成为 map 的一部分。但这是完全不必要的。您的 lambda 表达式只是用两个参数调用了一个有两个参数的函数。请完全删除它。

可工作的代码如下:

import concurrent.futures as cf
import itertools

nmax = 10
numberlist = range(nmax)
workers = 3

def _findmatch(listnumber, ref):    
    print('def _findmatch(listnumber, ref):')
    x=''
    listnumber=str(listnumber)
    ref = str(ref)
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
    if ref in listnumber:
        x = listnumber
    print('x = {0}'.format(x))
    return x 

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    #for n in executor.map(_findmatch, numberlist):
    for n in executor.map(_findmatch, numberlist, itertools.repeat(5)):
        print(type(n))
        print(n)
        #if str(ref[0]) in n:
        #    print('match')

1
感谢您的解释和解决方案。 :) - Sun Bear
1
我想邀请你回答我的后续问题,我在其中对Executor.mapExecutor.submit的性能进行了基准测试,发现前者显著较慢,我想知道原因是什么? - Sun Bear

9

关于您的第一个问题,我是否正确理解您想传递一个参数,其值仅在调用map时确定,但对映射函数的所有实例都是恒定的?如果是这样,我会使用从“模板函数”派生的函数来进行map,并使用functools.partial将第二个参数(在您的示例中为ref)烘焙到其中:

from functools import partial
refval = 5

def _findmatch(ref, listnumber):  # arguments swapped
    ...

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(partial(_findmatch, refval), numberlist):
        ...

关于问题2,第一部分:我没有找到确切的代码片段来尝试pickle(序列化)应该在并行执行的函数,但这听起来很自然 - 不仅参数而且函数也必须以某种方式传输到workers,并且很可能必须进行序列化以进行此传输。在其他地方提到了可以pickle partial函数而不能pickle lambda函数,例如在这里:https://dev59.com/b3A75IYBdhLWcg3wg5Zh#19279016

关于问题2,第二部分:如果您想在ProcessPoolExecutor.map中调用具有多个参数的函数,则将函数作为第一个参数传递给它,后跟可迭代的函数第一个参数,后跟其第二个参数等。 在您的情况下:

for n in executor.map(_findmatch, numberlist, ref):
    ...

谢谢分享。 :) 你的解决方案很有效。这是我第一次学习关于partial的知识。 - Sun Bear
我想邀请您回答我的后续问题,在那里我对Executor.mapExecutor.submit的性能进行了基准测试,发现前者明显较慢,我想知道原因是什么? - Sun Bear
@mkorvas 我使用了你的解决方案来回答我的问题 https://stackoverflow.com/questions/56492876/unable-to-send-multiple-arguments-to-concurrrent-futures-executor-map - gansub

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接