什么是在 Python 中永久运行 asyncio 事件循环的惯用方法?

9

文档上看,启动异步应用程序的推荐方式是使用asyncio.run()函数,因此我的应用程序如下:

async def async_main():
    # Everything here can use asyncio.create_task():
    o = ObjectThatMustBeKeptReferenced()
    create_tasks_and_register_callbacks(o)

    # Wait forever, ugly:
    while True:
        await asyncio.sleep(10000)

asyncio.run(async_main())

async_main() 结尾的这个无限循环感觉非常不对。在其他语言中,这是我会一直调用事件循环的地方。因此,我尝试了以下代码:

def main():
    loop = asyncio.get_event_loop()

    # Everything here can use asyncio.create_task():
    o = ObjectThatMustBeKeptReferenced()
    create_tasks_and_register_callbacks(o)

    # Wait forever, pretty:
    loop.run_forever()

main()

问题在于,当我在我的函数中调用asyncio.create_task()时,即使事件循环已经创建并在线程上注册,也会出现类似RuntimeError: no running event loop的错误。

如何在asyncio事件循环上永远睡眠的Pythonic方式是什么?


任务和回调函数是否一直在运行?如果是,那么为什么需要自己的无限循环?如果不是,那么为什么要让它无限期地运行? - Tadhg McDonald-Jensen
是的,有的。我不需要自己的循环,但我不能退出函数,否则asyncio.run()会返回,我的程序就会退出,无限任务也不再被调度。 - lvella
我现在明白了,我之前认为asyncio.gather的等效操作是隐式完成的,但显然asyncio.run并没有这样做,也许我想到的是调用asyncio.async(async_main())然后永久运行,但我已经有一段时间没有使用Python异步了。对我来说,显式调用gather似乎是最好的选择。 - Tadhg McDonald-Jensen
asyncio.async()?也许您输错了,因为没有这样的函数。 - lvella
我发布了一个回答来解决async的历史问题,并且认为暴露已创建任务应该是API中的一个特性而不是漏洞 - Tadhg McDonald-Jensen
4个回答

7
您可以将sleep循环更改为从未设置的临时事件:
# wait forever
await asyncio.Event().wait()

如果需要,你可以轻松修改此操作,将其存储到变量中并作为关闭信号传播。

另一种选择是让创建任务的函数返回它创建的任务,这样即使(或正好因为)它们永远不会完成,你也可以等待它们:

async def async_main():
    o = ObjectThatMustBeKeptReferenced()
    tasks = create_tasks_and_register_callbacks(o)
    # wait forever, or until a task raises
    await asyncio.gather(*tasks)

虽然这并没有清晰地传达无限循环的意图,但它有一个优点:如果任何任务引发了未处理的异常,它将停止程序(并传播该异常)。

3
好的!现在看起来很明显!顺便说一下,我选择了await asyncio.Future()而不是使用Event,因为它是一个更轻量级的对象。 - lvella
2
这甚至有点诗意:等待未来!显然,它永远不会到来... - lvella
@lvella 你说得对,future更轻量级,更能体现诗意。:) 事件的重量不应该成为问题,因为你只需要创建一次。如果你正在使用future,请考虑使用loop.create_future()来适应其他事件循环,比如uvloop。对我来说,await asyncio.get_event_loop().create_future()似乎比await asyncio.Event().wait()稍微有点晦涩,因为事件是一个高级构造,比future更容易理解,但这可能是个人口味的问题。 - user4815162342
@lvella 另一个选择是仅等待您创建的任务,尽管它们永远不会停止。我已更新答案以提及此选项。 - user4815162342
1
我不喜欢等待其中一个任务的设计影响,因为它们被隐藏并且在各自的类中很好地隔离,将它们取回会导致API出现漏洞。 - lvella
@lvella 好的,没问题。不过我会在答案中保留这个建议,因为它在其他情境下也是有意义的。 - user4815162342

4
无限循环while True在基于asyncio的脚本中非常具有Python特色。它代表了您程序的循环。这将脚本逻辑与低级事件循环解耦,使您能够从应用程序逻辑的角度控制程序的生命周期。进行一些周期性的操作和检查。
这样的循环可以被包装在try-exceptfinally中,并放置在其他模块和类中,这意味着长期的周期性工作。这与asyncio保持一致,因为它使用协同式多任务处理,这样的循环将控制权交给事件循环(例如通过await asyncio.sleep)。 asyncio.run更可取,因为它仍然归结为调用run_forever,但在退出脚本的主协程时,它会取消并清除正在运行的任务、异步生成器等。
无限循环的实际示例: Uvicorn
    #...
    async def main_loop(self):
        counter = 0
        should_exit = await self.on_tick(counter)
        while not should_exit:
            counter += 1
            counter = counter % 864000
            await asyncio.sleep(0.1)
            should_exit = await self.on_tick(counter)

aiohttp:

        #...
        # sleep forever by 1 hour intervals,
        # on Windows before Python 3.8 wake up every 1 second to handle
        # Ctrl+C smoothly
        if sys.platform == "win32" and sys.version_info < (3, 8):
            delay = 1
        else:
            delay = 3600

        while True:
            await asyncio.sleep(delay)
    finally:
        await runner.cleanup()

1
请注意,睡眠1秒钟将防止进程进入睡眠状态(超过1秒钟),即使事件循环没有工作要做。这种情况在服务器上并不是什么大问题,但可能会耗尽笔记本电池。 - user4815162342

3
我过去的做法是使用asyncio.async,它是一个已被弃用的别名asyncio.ensure_future。人们面临的最大问题是在没有运行循环的情况下调用ensure_future,这不会确保任何事情发生,因此添加了更高级别的APIasyncio.create_task,以断言有一个运行循环,以便任务实际上能够运行。在您的情况下,您希望安排main任务然后永远运行循环,因此ensure_future的较低级API确实是您想要的:
async def async_main():
    # Everything here can use asyncio.create_task():
    o = ObjectThatMustBeKeptReferenced()
    create_tasks_and_register_callbacks(o)
    
asyncio.ensure_future(async_main())
asyncio.get_event_loop().run_forever()

这基本上意味着当循环运行时,我们确保async_main将被执行,但与asyncio.run不同的是,我们让循环永远运行而不仅仅是在完成主入口点之前运行。

但是你可以做得更好,无论一个给定的任务是否会永远运行,都不应该影响你,你应该理想地跟踪它们,并让你的函数await所有这些任务,如果其中一些任务永远运行,那么你的函数也将通过扩展永远运行,但如果没有,那么一旦所有异步任务都退出,你的程序也会退出。所需的全部就是跟踪每个create_task的返回值并像@user4815162342一样传递给asyncio.gather

为了实现这一点,类不必公开其生成的任务,只需提供一种等待它们的方法即可,这是一个示例,以说明这个想法:

class NetworkAsyncHandler():
    def __init__(self, some_data):
        # don't just create floating tasks, keep a record of them
        self._floating_coroutines = []
        self.remembered_data = some_data
        self.connect_to_some_server_idk_this_is_is_an_example("localhost")
    def connect_to_some_server_idk_this_is_is_an_example(self, host):
        # ALWAYS KEEP A REFERENCE TO THE FUTURES YOU CREATE
        a = asyncio.create_task(DO_STUFF(host, self.remembered_data))
        # assuming you were never explicitly awaiting on a yourself, then just add it to the list
        self._floating_coroutines.append(a)

    async def join(self):
        """awaits on all floating coroutines created"""
        await asyncio.gather(*self._floating_coroutines)


async def async_main():
    a = NetworkAsyncHandler("A")
    b = NetworkAsyncHandler("B")
    # this function will wait on all created subroutines and return only when all
    # sub routines end.  If some run forever then so do we.
    await asyncio.gather(a.join(), b.join())
    
# now this works exactly as you would want it to.
asyncio.run(async_main())

您可以想象,通常这个 NetworkAsyncHandler 可能会一直运行,但也许有一个信号可以从服务器接收到关闭它,导致它返回,如果两个服务器都收到了这个信号,您希望 Python 程序结束。第一种情况是调用 run_forever,这种情况不会发生,事实上,您甚至不知道例程是否仍在运行,但在这种架构中,每个函数都等待它实际依赖的例程,并且 asyncio.run 正是它旨在做的。


2
什么是在asyncio事件循环上永久睡眠的Pythonic方法? asyncio.run 是一种更高级的API,通常是运行事件循环的首选方式。但使用较低级别的 run_forever 也没有问题。
问题在于,当我调用 asyncio.create_task() 时,会出现类似于 RuntimeError: no running event loop 的错误。
这不起作用,因为 create_task 无法获取正在运行的事件循环。幸运的是,循环中也有一个 create_task 方法。您需要更新 create_tasks_and_register_callbacks 来接受循环作为参数。
def create_tasks_and_register_callbacks(obj, loop):

然后在其定义中将任何对asyncio.create_task的引用更改为loop.create_task


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接