如何使用asyncio在执行器中调度任务?

5

我一直在使用asyncio进行并发编程,但现在遇到了一个问题。我需要使用asyncio调度任务,但是该任务会阻塞进程,因此我希望使用concurrent.futures中的线程池(threadpool)在执行器(executor)中执行。

我看过像这样的示例,可以通过以下方式调度任务:

now = loop.time()
loop.call_at(now + 60, callback, arg, loop)

就像这样运行在执行器中的任务:

blocking_tasks = [
    loop.run_in_executor(executor, blocks)
    for i in range(6)
]
completed, pending = await asyncio.wait(blocking_tasks)

但是我如何在执行器中安排一个阻塞任务的运行呢?
2个回答

5

run_in_executor 返回一个 future,因此无法与需要普通函数的 call_at 一起使用。但是,您可以轻松地延迟执行,使用 asyncio.sleep()

async def my_task():
    await asyncio.sleep(60)
    result = await loop.run_in_executor(None, fn)
    ...

taskobj = loop.create_task(my_task())

这样做的好处是通过create_task创建的任务可以在休眠期间取消。此外,您可以从my_task()返回有用的值,并使用await taskobj、调用taskobj.result()loop.run_until_complete(taskobj)获取它。


3
您可以创建如下所示的包装器来处理这个问题。
def run_in_async_loop(f):
    @functools.wraps(f)
    async def wrapped(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return (await loop.run_in_executor(None, f(*args, **kwargs)))
    return wrapped

1
在第一个 return 行后面添加一个 ) 以避免语法错误。 - Jakub Bláha

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接