在单独的线程中异步执行 "fire and forget" 任务的 Asyncio

6
我有一个长时间运行的同步Python程序,我想每秒运行约10个“fire and forget”任务。这些任务会命中远程API,并且不需要返回任何值。我尝试了这个答案,但是它需要太多的CPU/内存来生成和维护所有单独的线程,所以我一直在研究asyncio。 这个答案很好地解释了如何使用asyncio运行“fire and forget”。然而,它需要使用run_until_complete(),这会等到所有的asyncio任务完成才会结束。我的程序正在使用同步Python,所以这对我没有用。理想情况下,代码应该像这样简单,其中log_remote不会阻塞循环:
while True:
    latest_state, metrics = expensive_function(latest_state)
    log_remote(metrics) # <-- this should be run as "fire and forget"

我使用的是Python 3.7。如何在另一个线程上轻松地使用asyncio运行它?

1个回答

7

您可以在单个后台线程中启动一个事件循环,并将其用于所有fire&forget任务。例如:

import asyncio, threading

_loop = None

def fire_and_forget(coro):
    global _loop
    if _loop is None:
        _loop = asyncio.new_event_loop()
        threading.Thread(target=_loop.run_forever, daemon=True).start()
    _loop.call_soon_threadsafe(asyncio.create_task, coro)

有了这个,你只需要在调用 async def 创建的协程对象上调用 fire_and_forget 函数:

# fire_and_forget defined as above

import time

async def long_task(msg):
    print(msg)
    await asyncio.sleep(1)
    print('done', msg)

fire_and_forget(long_task('foo'))
fire_and_forget(long_task('bar'))
print('continuing with something else...')
time.sleep(3)

请注意,log_remote 需要使用 asyncio、aiohttp 等并改写为 async def 的形式,而不是使用 requests 等同步方式。

这样做是每次创建一个新线程,而不是使用一个单独持续运行的线程吗? - Alexis.Rolland
1
@Alexis.Rolland 不,fire_and_forget() 会在第一次调用时小心地创建/启动一个新线程。 - user4815162342
如果Python GIL一次只允许运行一个线程,那么线程如何与asyncio一起工作?当异步I/O任务完成时,线程是否真的退出? - thegreatcoder
@thegreatcoder GIL 允许单个线程一次运行,这会阻止您利用多个核心。在多个线程中运行的代码仍然会呈现出并行运行的状态,类似于在单核系统上执行的代码。在此设置中,运行事件循环的线程仅启动一次,并且在异步任务完成时不会退出;它仍然在后台睡眠,等待进一步的任务。 - user4815162342
感谢澄清。那么当主线程退出上述程序时,该线程将在后台运行?我试图弄清楚如果有全局解释器锁(GIL),额外的线程实际上何时会被执行。 - thegreatcoder
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接