使用run_in_executor和asyncio时的超时处理

8
我正在使用asyncio来运行一段类似于以下阻塞代码的程序:
result = await loop.run_in_executor(None, long_running_function)

我的问题是:我能给long_running_function添加执行超时吗?

基本上我不希望long_running_function持续时间超过2秒,而且因为该函数来自第三方库,我无法在其中进行适当的超时处理。


1
您可以尝试将loop.run_in_executor()包装在wait_for()中,wait_for()函数接受一个超时参数。 - songololo
确实可以,谢谢! - Andrés Fernández
3个回答

11

关于取消长时间运行函数的警告:

虽然使用 loop.run_in_executor 返回的 Future 包装一个 asyncio.wait_for 调用将允许 事件循环 在一定时间后停止等待 long_running_function,但它不一定会停止底层的 long_running_function。这是 concurrent.futures 的缺点之一,据我所知,没有简单的方法来取消 concurrent.futures.Future


嗨,Jashandeep,你所说的“不一定会停止”是什么意思?你是指“不会停止”,还是可以指出任何条件,以便它不会停止? - Andrés Fernández
1
顺便问一下,我们讨论的不是 "asyncio.futures.Future" 对象(而不是 "concurrent.futures.Future")吗? - Andrés Fernández
1
它不会立即停止;也就是说,该函数将运行到完成。因此,如果其中有一个 while True: ... 循环,它将永远不会停止。--- asyncio 中的默认执行程序是 concurrent.futures.ThreadPoolExecutor,它又使用了 concurrent.futures.Futurerun_in_executor 调用 Executor.submit 并将返回的 concurrent.futures.Future 包装到 asyncio.Future 中,以便当一个完成时,另一个也会完成。 - Jashandeep Sohi
ThreadPoolExecutor会响应手动取消吗?(假设一旦引发了TimeOutError就调用)? - songololo
2
不行。在取消执行器时,它不会将下一个任务安排到池中,但它不会停止已经执行的作业。 从字面上讲,使用Python API没有优雅地停止线程的方法。虽然有一些hackery方法,但它们非常不可靠,我不建议使用它们。 - Andrew Svetlov
3
我遇到了同样的问题,超时已经触发,但是阻塞函数仍在运行,因此脚本就一直挂起。我看到这些评论已经有两年了,现在是否有解决方法? - steven2308

7
您可以使用asyncio.wait_for函数:
future = loop.run_in_executor(None, long_running_function)
result = await asyncio.wait_for(future, timeout, loop=loop)

2
太好了,谢谢!当超时达到时,该表达式将抛出 asyncio.futures.TimeoutError 异常。 - Andrés Fernández
2
警告:这是一个非常危险的答案。不要在生产中依赖它。它只是看起来做你想要的事情:https://gist.github.com/coxley/5879f5ceecfbb4624bee23a6cef47510 - coxley
@coxley 我尝试了你的例子,但无法重现你的错误。它按预期工作,没有打印出“hell yeah”。你正在运行哪个Python版本?我正在运行Python 3.6.8。也许你提到的问题已经被修复了? - chilicheech
@chilicheech:我在创建该代码片段时使用的是Python 3.7,在刚刚复现时使用了3.8.5。这并不是要修复的错误,更像是一种“陷阱”。异步取消根本不会影响执行器内部的工作。等待结果的未来是按预期被取消的。 - coxley

0

虽然没有使用run_in_executor,但我有一些关于“使用超时处理包装块函数异步”的解决方法。

import asyncio
import threading
import time
import ctypes


def terminate_thread(t: threading.Thread, exc_type=SystemExit):
    if not t.is_alive(): return
    try:
        tid = next(tid for tid, tobj in threading._active.items() if tobj is t)
    except StopIteration:
        raise ValueError("tid not found")
    if ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exc_type)) != 1:
        raise SystemError("PyThreadState_SetAsyncExc failed")


class AsyncResEvent(asyncio.Event):
    def __init__(self):
        super().__init__()
        self.res = None
        self.is_exc = False
        self._loop = asyncio.get_event_loop()

    def set(self, data=None) -> None:
        self.res = data
        self.is_exc = False
        self._loop.call_soon_threadsafe(super().set)

    def set_exception(self, exc) -> None:
        self.res = exc
        self.is_exc = True
        self._loop.call_soon_threadsafe(super().set)

    async def wait(self, timeout: float | None = None):
        await asyncio.wait_for(super().wait(), timeout)
        if self.is_exc:
            raise self.res
        else:
            return self.res


async def sub_thread_async(func, *args, _timeout: float | None = None, **kwargs):
    res = AsyncResEvent()

    def f():
        try:
            res.set(func(*args, **kwargs))
        except Exception as e:
            res.set_exception(e)
        except SystemExit:
            res.set_exception(TimeoutError)

    (t := threading.Thread(target=f)).start()
    try:
        return await res.wait(_timeout)
    except TimeoutError:
        raise TimeoutError
    finally:
        if not res.is_set():
            terminate_thread(t)


_lock = threading.Lock()


def test(n):
    _tid = threading.get_ident()
    for i in range(n):
        with _lock:
            print(f'print from thread {_tid} ({i})')
        time.sleep(1)
    return n


async def main():
    res_normal = await asyncio.gather(*(sub_thread_async(test, 5) for _ in range(2)))
    print(res_normal)  # [5,5]
    res_normal_2 = await asyncio.gather(*(sub_thread_async(test, 2, _timeout=3) for _ in range(2)))
    print(res_normal_2)  # [2,2]
    res_should_not_get = await asyncio.gather(*(sub_thread_async(test, 5, _timeout=3) for _ in range(2)))
    print(res_should_not_get)  # timeout error


if __name__ == '__main__':
    asyncio.new_event_loop().run_until_complete(main())


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接