如何使用带有超时的concurrent.futures?

33

我正在尝试在Python 3.2中使用concurrent.futures模块使超时功能生效。但是当它超时时,它并没有真正停止执行。我尝试了线程和进程池执行器,但它们都无法停止任务,只有在完成任务后才会引发超时。所以,有人知道是否可能使其工作吗?

import concurrent.futures
import time
import datetime

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]

def run_loop(max_number):
    print("Started:", datetime.datetime.now(), max_number)
    last_number = 0;
    for i in range(1, max_number + 1):
        last_number = i * i
    return last_number

def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
        try:
            for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
                print(future.result(timeout=1))
        except concurrent.futures._base.TimeoutError:
            print("This took to long...")

if __name__ == '__main__':
    main()

你不应该随处使用超时参数。在executor.map或as_completed上的超时作为整个工作的超时(“如果在调用Executor.map()/as_completed()后的timeout秒内调用__next__()并且结果不可用,则返回的迭代器会引发TimeoutError。”)。在as_completed循环下,对future.result调用超时没有意义,因为迭代器不会产生尚未完成的future。 - Radu Simionescu
3个回答

35
据我所知,TimeoutError实际上是在你期望的时候被触发,而不是任务完成后触发。
然而,你的程序本身会一直运行,直到所有正在运行的任务都完成。这是因为当前执行的任务(在你的情况下,可能是所有已提交的任务,因为你的池大小等于任务数量)实际上并没有“被杀死”。
TimeoutError被引发,以便您可以选择不等待任务完成(并做其他事情),但任务将继续运行直到完成。只要Executor的线程/子进程中存在未完成的任务,Python就不会退出。
据我所知,目前无法仅“停止”当前正在执行的Futures,您只能“取消”尚未启动的预定任务。在您的情况下,不会有任何预定任务,但想象一下您有一个由5个线程/进程组成的池,您想要处理100个项目。在某些时刻,可能会有20个已完成的任务、5个正在运行的任务和75个已预定的任务。在这种情况下,您将能够取消那些76个预定的任务,但仍会继续运行这4个正在运行的任务,无论您是否等待结果。
即使无法以这种方式完成,我想应该有方法实现您想要的最终结果。也许这个版本能帮助您(不确定它是否完全符合您的预期,但可能会有所帮助):
import concurrent.futures
import time
import datetime

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]

class Task:
    def __init__(self, max_number):
        self.max_number = max_number
        self.interrupt_requested = False

    def __call__(self):
        print("Started:", datetime.datetime.now(), self.max_number)
        last_number = 0;
        for i in xrange(1, self.max_number + 1):
            if self.interrupt_requested:
                print("Interrupted at", i)
                break
            last_number = i * i
        print("Reached the end")
        return last_number

    def interrupt(self):
        self.interrupt_requested = True

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor:
        tasks = [Task(num) for num in max_numbers]
        for task, future in [(i, executor.submit(i)) for i in tasks]:
            try:
                print(future.result(timeout=1))
            except concurrent.futures.TimeoutError:
                print("this took too long...")
                task.interrupt()


if __name__ == '__main__':
    main()

通过为每个“任务”创建一个可调用对象,并将它们提供给执行器而不仅仅是普通函数,您可以提供一种“中断”任务的方式。 提示:删除task.interrupt()行,看看会发生什么,这可能会使您更容易理解我上面的长篇说明 ;-)


13

我最近也遇到了这个问题,最终我想到了使用ProcessPoolExecutor的以下解决方案:


def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
        try:
            for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
                print(future.result(timeout=1))
        except concurrent.futures._base.TimeoutError:
            print("This took to long...")
            stop_process_pool(executor)

def stop_process_pool(executor):
    for pid, process in executor._processes.items():
        process.terminate()
    executor.shutdown()

txmc,您能杀死其中一个进程吗?还是必须全部杀死? - Glen Thompson
根据使用 stop_process_pool(executor) 的情况,我猜测您正在终止所有进程。 - Mattwmaster58
stop_process_pool 中的循环应该是 for pid, process in executor._processes.items(): (processes 应该改为 process),但 SO 不允许我编辑这么小的更改。 - pcarter
@pcarter,我修复了拼写错误。 - joemooney
只是为了澄清,这里的超时时间是以秒为单位,对吗? - Ashwanth A.R
在使用timeout参数对future.result(timeout=1)进行循环时,并没有意义,因为as_completed迭代器永远不会返回未完成的future... - Radu Simionescu

0

我的建议是使用ThreadPool而不是concurrent.futures。正如文档所说:

所有排队到ThreadPoolExecutor的线程都将在解释器退出之前加入。请注意,执行此操作的退出处理程序使用atexit添加的任何退出处理程序之前执行。这意味着必须捕获和处理主线程中的异常,以便优雅地退出线程。

在更复杂的情况下,整个程序会被卡住。以下代码片段足以说明我的意思,尽管有些偏离问题:

import concurrent.futures, time, datetime
from multiprocessing.pool import ThreadPool

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]

def run_loop(max_number):
    print("Started:", datetime.datetime.now(), max_number)
    last_number = 0
    i = 0
    while True:
        last_number = i * i
        i += 1
    return last_number

def origin():
    try:
        with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
            try:
                for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
                    print(future.result(timeout=1))
            except concurrent.futures._base.TimeoutError:
                print("This took to long...") # It suspends infinitely.
    except:
        print('Ending from origin.')

def update():
    try:
        with ThreadPool(len(max_numbers)) as pool:
            result = pool.map_async(run_loop, max_numbers)
            for num in result.get(2):
                print(num)
    except:
        print('Ending from update.')

if __name__ == '__main__':
    origin()
    # update()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接