ThreadPoolExecutor().map和ThreadPoolExecutor().submit有什么不同?

114

我刚写的一些代码让我感到非常困惑。出乎意料的是,我发现:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(f, iterable))

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(map(lambda x: executor.submit(f, x), iterable))

产生不同的结果。第一个函数返回 f 返回类型的列表,第二个函数返回需要使用它们的 result() 方法来评估的 concurrent.futures.Future 对象列表,以获取 f 返回的值。

我的主要关注点是这意味着 executor.map 无法利用 concurrent.futures.as_completed,后者似乎是一种极其方便的方式,在我执行某些长时间运行的数据库调用并在它们变为可用时对它们进行评估时使用。

我并不完全清楚 concurrent.futures.ThreadPoolExecutor 对象的工作原理--天真的说,我更喜欢(略微冗长的):

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    results = [f.result() for f in futures.as_completed(result_futures)]

我是否错误地更倾向于使用更简洁的executor.map,以便利用可能的性能提升?

4个回答

61
问题在于您将 ThreadPoolExecutor.map 的结果转换为列表。如果不这样做,而是直接在生成器上迭代,结果仍然按照原始顺序产生,但循环在所有结果准备好之前就继续了。您可以使用以下示例进行测试:
import time
import concurrent.futures

e = concurrent.futures.ThreadPoolExecutor(4)
s = range(10)
for i in e.map(time.sleep, s):
    print(i)

保持顺序的原因可能是因为有时候按照map函数的给定顺序得到结果非常重要。而且结果很可能没有被包装在future对象中,因为在某些情况下如果需要获取所有结果,再对列表执行另一个map可能会花费太长时间。最后,在大多数情况下,在循环处理第一个值之前,下一个值很可能已经准备好了。这在以下示例中得到了证明:

import concurrent.futures

executor = concurrent.futures.ThreadPoolExecutor() # Or ProcessPoolExecutor
data = some_huge_list()
results = executor.map(crunch_number, data)
finals = []

for value in results:
    finals.append(do_some_stuff(value))
在这个例子中,可能会发生do_some_stuff所需时间比crunch_number长得多的情况,如果是这种情况,尽管保持map的易用性,性能损失并不大。 此外,由于工作线程(/进程)从列表开头开始处理并按照列表顺序处理提交的结果,因此结果应该已经按照迭代器产生的顺序完成。这意味着在大多数情况下,executor.map就可以了,但在某些情况下,例如如果处理值的顺序无关紧要并且传递给map的函数运行时间差异很大,则future.as_completed可能更快。

2
是的,我不关心返回顺序,我更关心尽快完成任务。我的担忧是,当结果顺序无关紧要时,executor.map 的性能表现会比使用在适当的可迭代对象上映射 executor.submit 并使用 futures.as_completed 更差。你知道这是否属实吗? - Patrick Collins
4
我认为我的回答表达不清楚。“正确顺序”在这种情况下意味着值很可能按照您提交给“map”的列表中的顺序完成。对于大多数要使用map进行的计算,这是成立的。因为同一函数运行的时间通常(但不总是)相当。但是,如果您有一个具有非常不同运行时间的函数,则使用future.as_completed可能会更快。 - Kritzefitz
首先将s = [1,2,4,8],然后将s = [8,4,2,1]解决了我一些疑惑。 - Christian Pao.

31
以下是 .submit().map() 的示例。 它们都立即接受任务(提交|映射-开始)。完成时间相同,11秒钟 (最后结果时间 - 开始)。然而,.submit() 一旦 ThreadPoolExecutor 中的任何线程(maxThreads=2) 完成就会立即返回结果(无序!),而.map() 按照提交的顺序返回结果。
import time
import concurrent.futures

def worker(i):
    time.sleep(i)
    return i,time.time()

e = concurrent.futures.ThreadPoolExecutor(2)
arrIn = range(1,7)[::-1]
print arrIn

f = []
print 'start submit',time.time()
for i in arrIn:
    f.append(e.submit(worker,i))
print 'submitted',time.time()
for r in concurrent.futures.as_completed(f):
    print r.result(),time.time()
print

f = []
print 'start map',time.time()
f = e.map(worker,arrIn)
print 'mapped',time.time()
for r in f:
    print r,time.time()    

输出:

[6, 5, 4, 3, 2, 1]
start submit 1543473934.47
submitted 1543473934.47
(5, 1543473939.473743) 1543473939.47
(6, 1543473940.471591) 1543473940.47
(3, 1543473943.473639) 1543473943.47
(4, 1543473943.474192) 1543473943.47
(1, 1543473944.474617) 1543473944.47
(2, 1543473945.477609) 1543473945.48

start map 1543473945.48
mapped 1543473945.48
(6, 1543473951.483908) 1543473951.48
(5, 1543473950.484109) 1543473951.48
(4, 1543473954.48858) 1543473954.49
(3, 1543473954.488384) 1543473954.49
(2, 1543473956.493789) 1543473956.49
(1, 1543473955.493888) 1543473956.49

1
这是一个很好的答案。示例加1。 - Archit

29
如果您使用concurrent.futures.as_completed,您可以处理每个函数的异常。
import concurrent.futures
iterable = [1,2,3,4,6,7,8,9,10]

def f(x):
    if x == 2:
        raise Exception('x')
    return x

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    # -> using `executor.submit()` **requires** calling
    #      `concurrent.futures.as_completed()` <-
    #
    for future in concurrent.futures.as_completed(result_futures):
        try:
            print('resutl is', future.result())
        except Exception as e:
            print('e is', e, type(e))
# resutl is 3
# resutl is 1
# resutl is 4
# e is x <class 'Exception'>
# resutl is 6
# resutl is 7
# resutl is 8
# resutl is 9
# resutl is 10

executor.map中,如果出现异常,整个执行器都会停止。您需要在工作函数中处理异常。
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    # -> Do not call `concurrent.futures.as_completed()`
    #    when using `executor.map()` <-
    #
    for each in executor.map(f, iterable):
        print(each)
# if there is any exception, executor.map would stop

谢谢你的解决方案,它帮了我很多! - Shubham K.

20
除了这里的答案解释外,直接访问源代码也可能会有所帮助。它证实了另一个答案的声明,即:
  • .map()按照提交顺序返回结果,而
  • 使用concurrent.futures.as_completed()迭代Future对象列表时,并不保证这种顺序,因为这就是as_completed()的特性。

.map()定义在基类concurrent.futures._base.Executor中:

class Executor(object):
    def submit(self, fn, *args, **kwargs):
        raise NotImplementedError()

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        if timeout is not None:
            end_time = timeout + time.monotonic()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]  # <!!!!!!!!

        def result_iterator():
            try:
                # reverse to keep finishing order
                fs.reverse()  # <!!!!!!!!
                while fs:
                    # Careful not to keep a reference to the popped future
                    if timeout is None:
                        yield fs.pop().result()  # <!!!!!!!!
                    else:
                        yield fs.pop().result(end_time - time.monotonic())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

正如您提到的,还有.submit(),需要在子类中定义,即ProcessPoolExecutorThreadPoolExecutor,返回一个_base.Future实例,您需要调用.result()来实际执行任何操作。

.map()的重要内容如下:

fs = [self.submit(fn, *args) for args in zip(*iterables)]
fs.reverse()
while fs:
    yield fs.pop().result()

.reverse() 加上 .pop() 是一种获取以先提交的结果(来自iterables)先被产生,第二个提交的结果被生成为第二个,依此类推的方法。所得到的迭代器的元素不是Future,而是实际的结果本身。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接