存储结果的ThreadPoolExecutor

16

我对使用"concurrent.futures"进行并行处理还比较新,并正在测试一些简单的实验。我编写的代码似乎可以工作,但是我不确定如何存储结果。我尝试创建一个列表("futures") 并将结果附加到其中,但这会显著减慢过程。我想知道是否有更好的方法。谢谢。

import concurrent.futures
import time

couple_ods= []
futures=[]

dtab={}
for i in range(100):
    for j in range(100):
       dtab[i,j]=i+j/2
       couple_ods.append((i,j))

avg_speed=100
def task(i):
    origin=i[0]
    destination=i[1]
    time.sleep(0.01)
    distance=dtab[origin,destination]/avg_speed
    return distance
start1=time.time()
def main():
    with concurrent.futures.ThreadPoolExecutor() as executor:
       for number in couple_ods:
          future=executor.submit(task,number)
          futures.append(future.result())

if __name__ == '__main__':
    main()
end1=time.time()
1个回答

49
当您调用future.result()时,它会阻塞直到值准备就绪。因此,在这里,您无法从并行性中获得任何好处-您启动一个任务,等待其完成,启动另一个任务,等待其完成,依此类推。

当然,您的示例一开始就不会从线程中受益。您的任务除了CPU绑定的Python计算以外什么也没做,这意味着(至少在CPython、MicroPython和PyPy中,它们是附带concurrent.futures的唯一完整实现),GIL(全局解释器锁)将防止多个线程同时进行。

希望您的真正的程序不同。如果它正在执行I/O绑定的操作(发出网络请求、读取文件等),或者使用像NumPy这样释放GIL的扩展库来处理重型CPU工作,则它将有效运行。否则,您需要在此处使用ProcessPoolExecutor


无论如何,您要做的是将future本身附加到列表中,以便在等待它们之前获取所有未来的列表:

for number in couple_ods:
    future=executor.submit(task,number)
    futures.append(future)
     

在启动了所有作业之后,您可以开始等待它们完成。当您需要更多控制时,有三个简单选项和一个复杂选项。


(1) 您可以直接循环遍历它们以按提交顺序等待它们:

for future in futures:
    result = future.result()
    dostuff(result)

(2) 如果您需要在执行任何工作之前等待它们全部完成,您只需调用 wait

futures, _ = concurrent.futures.wait(futures)
for future in futures:
    result = future.result()
    dostuff(result)

(3)如果您希望尽快处理每个任务,即使它们的顺序不同,请使用as_completed

for future in concurrent.futures.as_completed(futures): 
    dostuff(future.result())

注意到文档中使用此函数的示例提供了一些方法来识别哪个任务已完成。如果需要,可以简单地为每个任务传递一个索引,然后return index, real_result,然后您可以使用for index, result in ...进行循环。

(4) 如果需要更多控制,可以循环并wait等待目前为止完成的任何任务:

while futures:
    done, futures = concurrent.futures.wait(concurrent.futures.FIRST_COMPLETED)
    for future in done:
        result = future.result()
        dostuff(result)

这个例子与as_completed做的事情相同,但您可以对其进行微小变化以执行不同的操作,比如等待所有内容完成,但如果其中任何一个引发异常就提前取消。


对于许多简单情况,您可以使用执行程序的map方法来简化第一种选项。它的工作方式类似于内置的map函数,为参数中的每个值调用一次函数,然后提供可以循环遍历以按相同顺序获取结果的内容,但是它是并行执行的。所以:

for result in executor.map(task, couple_ods):
    dostuff(result)

1
我认为答案中的as_completed部分有一个小错误,应该是:for future in concurrent.futures.as_completed(futures): dostuff(future.result()) - slallum

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接