如何在不使用await的情况下调用异步函数?

68

我在aiohttp应用程序中有一个控制器操作。

async def handler_message(request):

    try:
        content = await request.json()
        perform_message(x,y,z)
    except (RuntimeError):
        print("error in perform fb message")
    finally:
        return web.Response(text="Done")

perform_message 是一个异步函数。当我调用这个函数时,我希望我的动作尽快返回,并将 perform_message 放入事件循环中。

这样一来,perform_message 就不会被立即执行。

5个回答

81
一种方法是使用 create_task 函数:
import asyncio

async def handler_message(request):
    ...
    loop = asyncio.get_event_loop()
    loop.create_task(perform_message(x,y,z))
    ...

根据循环文档,从Python 3.10开始,asyncio.get_event_loop()已被弃用。如果您正在尝试从协程/回调获取循环实例,则应改用asyncio.get_running_loop()。如果从主线程调用此方法,它将无法工作,此时必须实例化一个新的循环:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.create_task(perform_message(x, y, z))
loop.run_forever()

此外,如果在程序运行期间只调用一次且不需要实例化其他循环(不太可能),则可以使用以下代码:
asyncio.run(perform_message(x, y, z))

此函数创建一个事件循环,并在协程结束时终止它,因此应仅在上述情况下使用。


4
是的,但在等待之前它仍未开始,对吗?如果您想创建它、启动它、做其他事情,然后再等待它(它可能已经完成了,如果没有,它会阻塞直到完成)。 - Jeppe
6
@Jeppe 不会,create_task 会尽快启动它。我鼓励你测试它(完全不需要等待结果)。 - freakish
8
@HossamAl-Dokkani 兄弟,你评论之前能不能先检查一下呢?它已经开始任务了。你不需要等待它。结束了。 - freakish
4
听着,如果循环没有运行,就无法在其中运行异步函数。你必须要启动循环。你可以使用 loop.run_until_complete(coro) 或者 loop.run_forever() 来启动循环。参见:https://docs.python.org/zh-cn/3/library/asyncio-eventloop.html#running-and-stopping-the-loop 在你的情况下,似乎你正在寻找第一种选项。 - freakish
3
OP想要在事件循环中运行任务,可能是为了安排它在稍后执行而不是立即执行。我们几乎无法控制事件循环以及何时执行任务。此外,“立即”一词的定义因为在多任务环境中你几乎永远无法保证这种情况。 - freakish
显示剩余22条评论

11

简单来说:

import asyncio
from datetime import datetime

def _log(msg : str):
    print(f"{datetime.utcnow()} {msg}")

async def dummy(name, delay_sec):
    _log(f"{name} entering ...")
    await asyncio.sleep(delay_sec)
    _log(f"{name} done for the day!")

async def main():
    asyncio.create_task(dummy('dummy1', 5)) # last to finish
    asyncio.create_task(dummy('dummy2', 3)) # 2nd
    asyncio.create_task(dummy('dummy3', 1)) # First to finish

    _log(f"Yo I didn't wait for ya!")
    await asyncio.sleep(10)

asyncio.get_event_loop().run_until_complete(main())

输出:

2022-09-18 00:53:13.428285 Yo I didn't wait for ya!
2022-09-18 00:53:13.428285 dummy1 entering ...
2022-09-18 00:53:13.428285 dummy2 entering ...
2022-09-18 00:53:13.428285 dummy3 entering ...
2022-09-18 00:53:14.436801 dummy3 done for the day!
2022-09-18 00:53:16.437226 dummy2 done for the day!
2022-09-18 00:53:18.424755 dummy1 done for the day!

0
问题陈述:我在Azure Function App上使用FastApi,同时也使用了MongoDB的Beanie ODM。Beanie是一个异步ODM,因此需要在启动事件中进行初始化。但由于Azure Function是无服务器的,所以这不容易实现,而且我也不能使用asyncio.run(),因为FastAPI正在处理自己的异步循环。为了解决这个问题,我花了两天时间,最终修复了它。
以下是我是如何做到的,我不确定这种方法的缺点,但如果有人知道,请告诉我。
connection_string = os.environ["DCS"]
logging.info(f"connection string {connection_string}")
async def init():
    client = AsyncIOMotorClient(connection_string)
    await init_beanie(database=client['Tradex'],document_models=[Admin,User])

loop = asyncio.get_running_loop()
asyncio.set_event_loop(loop)
loop.create_task(init())

-1
你可以称之为多进程池。
from multiprocessing import Pool
import time

def f(x):
    return x*x

def postprocess(result):
    print("finished: %s" % result)

if __name__ == '__main__':
    pool = Pool(processes=1)              # Start a worker processes.
    result = pool.apply_async(f, [10], callback=postprocess) # Evaluate "f(10)" asynchronously calling callback when finished.
    print("waiting...")
    time.sleep(1)

-2
另一种方法是使用ensure_future函数:
import asyncio

async def handler_message(request):
...
loop = asyncio.get_event_loop()
loop.ensure_future(perform_message(x,y,z))
...

8
这会引发一个AttributeError: '_UnixSelectorEventLoop' object has no attribute 'ensure_future'的错误。ensure_futureasyncio模块中的一个函数,而不是事件循环的方法。如果只涉及协程,则基本上与loop.create_task相同。 - Gustavo Bezerra

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接