concurrent.futures.ThreadPoolExecutor.map(): 超时无效

17
import concurrent.futures
import time 

def process_one(i):
    try:                                                                             
        print("dealing with {}".format(i))                                           
        time.sleep(50)
        print("{} Done.".format(i))                                                  
    except Exception as e:                                                           
        print(e)

def process_many():
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: 
        executor.map(process_one,
                range(100),                                                          
                timeout=3)                                                           


if __name__ == '__main__':                                                           
    MAX_WORKERS = 10
    try:
        process_many()
    except Exception as e:                                                           
        print(e)      

根据文档

如果在调用Executor.map()时,调用__next__()方法后结果仍未返回,则迭代器将抛出concurrent.futures.TimeoutError超时异常,等待时间为timeout秒。

但是,在这里脚本没有抛出任何异常并持续等待。有什么建议吗?


你是想要终止被挂起的任务还是希望整个process_many调用时间控制在约3秒以内? - arachnivore
@arachnivore 终止挂起的任务并释放它们占用的线程。 - Hao Wang
哪个Python版本? - Mr_and_Mrs_D
2个回答

7

正如文档所指定的那样,如果您调用地图上的__next__()方法,超时错误只会被触发。例如,要调用此方法,您可以将输出转换为列表:

from concurrent import futures
import threading
import time


def task(n):
    print("Launching task {}".format(n))
    time.sleep(n)
    print('{}: done with {}'.format(threading.current_thread().name, n))
    return n / 10


with futures.ThreadPoolExecutor(max_workers=5) as ex:
    results = ex.map(task, range(1, 6), timeout=3)
    print('main: starting')
    try:
        # without this conversion to a list, the timeout error is not raised
        real_results = list(results) 
    except futures._base.TimeoutError:
        print("TIMEOUT")

输出:

Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-9_0: done with 1
ThreadPoolExecutor-9_1: done with 2
TIMEOUT
ThreadPoolExecutor-9_2: done with 3
ThreadPoolExecutor-9_3: done with 4
ThreadPoolExecutor-9_4: done with 5

在这里,第n个任务会睡眠n秒钟,所以在完成任务2后会发生超时。
编辑:如果你想终止没有完成的任务,可以尝试这个问题中的答案(它们不使用ThreadPoolExecutor.map()),或者你可以忽略其他任务的返回值并让它们完成:
from concurrent import futures
import threading
import time


def task(n):
    print("Launching task {}".format(n))
    time.sleep(n)
    print('{}: done with {}'.format(threading.current_thread().name, n))
    return n


with futures.ThreadPoolExecutor(max_workers=5) as ex:
    results = ex.map(task, range(1, 6), timeout=3)
    outputs = []
    try:
        for i in results:
            outputs.append(i)
    except futures._base.TimeoutError:
        print("TIMEOUT")
    print(outputs)

输出:

Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-5_0: done with 1
ThreadPoolExecutor-5_1: done with 2
TIMEOUT
[1, 2]
ThreadPoolExecutor-5_2: done with 3
ThreadPoolExecutor-5_3: done with 4
ThreadPoolExecutor-5_4: done with 5

1
无论如何,task(n) 都将被执行(打印“done with n”)。在 TimeoutException 的情况下有什么中断它的方法吗?我还尝试了生成器方式,显式调用 next();但结果相同。 - Hao Wang
@HaoWang 我已经编辑了我的答案来解决这个问题。然而,我刚刚意识到第二种方法只适用于按时间顺序排列的任务,即如果后面的任务有更高的延迟——这使得它非常不实用。我会尝试找到其他的解决方案。 - TrakJohnson
请注意,只有在 with 块内循环结果才会触发超时。 - dcompiled

0

正如我们在源代码(适用于Python 3.7)中所看到的,map返回一个函数:

def map(self, fn, *iterables, timeout=None, chunksize=1):
    ...
    if timeout is not None:
        end_time = timeout + time.time()
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
    # Yield must be hidden in closure so that the futures are submitted
    # before the first iterator value is required.
    def result_iterator():
        try:
            # reverse to keep finishing order
            fs.reverse()
            while fs:
                # Careful not to keep a reference to the popped future
                if timeout is None:
                    yield fs.pop().result()
                else:
                    yield fs.pop().result(end_time - time.time())
        finally:
            for future in fs:
                future.cancel()
    return result_iterator()

TimeoutError是在调用yield fs.pop().result(end_time - time.time())时引发的,但您必须请求一个结果来到达该调用。

其理念是您不关心任务的“提交”。任务已经被提交并在后台线程中开始运行。您关心的是在请求结果时超时 - 这是一个常见的用例,您在有限时间内提交任务,并从中请求结果,而不仅仅是提交它们并期望它们在有限时间内终止。

如果后者是您所关心的,您可以使用wait,例如,可以参考 Individual timeouts for concurrent.futures


感谢指出方向。我发现以下答案是目前为止最好的解决方案:https://dev59.com/uGw15IYBdhLWcg3wmc4J#44719580 - Hao Wang
在我的情况下,有一个庞大的URL池,我想尽可能地对它们进行采样(获取每个页面的内容),但不介意放弃慢速连接并尝试下一个。 - Hao Wang

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接