如何在有待处理但未被取消保护的任务存在的情况下运行asyncio循环,直到没有这样的任务为止?

4

我正在尝试向现有的asyncio循环中添加一些代码,以便在按下Ctrl-C时进行干净的关闭。以下是它所做的事情的抽象示例。

import asyncio, signal

async def task1():
    print("Starting simulated task1")
    await asyncio.sleep(5)
    print("Finished simulated task1")

async def task2():
    print("Starting simulated task2")
    await asyncio.sleep(5)
    print("Finished simulated task2")

async def tasks():
    await task1()
    await task2()

async def task_loop():
    try:
        while True:
            await asyncio.shield(tasks())
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop")
        raise

async def aiomain():
    loop = asyncio.get_running_loop()
    task = asyncio.Task(task_loop())
    loop.add_signal_handler(signal.SIGINT, task.cancel)
    await task

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        pass

#def main():
#    try:
#        loop = asyncio.get_event_loop()
#        loop.create_task(aiomain())
#        loop.run_forever()
#    except asyncio.CancelledError:
#        pass

if __name__ == '__main__':
    main()

在这个例子中,假设需要完成task1task2的序列一旦开始,否则会留下不一致的状态。因此在调用tasks时使用了asyncio.shield包装器。
上面的代码中,如果我在脚本开始后很快就打断它,并且它只打印了“Starting simulated task1”,那么循环将停止,并且task2永远不会启动。如果我尝试切换到被注释掉的main版本,那么该版本将永远不会退出,即使循环已经被正确地取消并且至少在几分钟内不会发生任何其他事情。它确实有一些进展,因为它至少完成了task1task2的任何正在进行的序列。
以下是一些头脑风暴的可能解决方案,尽管我仍然感觉必须有一些更简单的东西我漏掉了:
- 创建一个asyncio.Condition对象来同步递增的变量,以运行被asyncio.shield保护的函数,然后递减该变量。然后,在aiomain中,等待变量达到零,然后重新引发CancelledError异常。(在实现中,我可能会将这些部分组合成一个类,并使用__aexit__实现等待零的逻辑CancelledError。) - 完全跳过使用asyncio的取消机制,而是使用asyncio.Event或类似的机制来允许中断点或可中断的睡眠。虽然这似乎更具侵入性,需要我指定哪些点被视为可中断的,而不是声明需要受到取消保护的序列。
2个回答

4
这是一个非常好的问题。在回答这个问题的过程中,我学到了一些东西,所以我希望你仍然关注这个主题。
首先要调查的是,shield() 方法是如何工作的? 在这一点上,文档非常令人困惑。 我直到读了 test_tasks.py 中标准库测试代码才弄明白它。 这是我的理解:
考虑以下代码片段:
async def coro_a():
    await asyncio.sheild(task_b())
    ...
task_a = asyncio.create_task(coro_a())
task_a.cancel()

当执行任务 task_a.cancel() 语句时,确实取消了 task_a 。await 语句立即抛出 CancelledError 异常,而不等待 task_b 完成。但是 task_b 将继续运行。外部任务(a)停止,但内部任务(b)不会停止。
这是您程序的修改版本,用于说明此情况。主要更改是在取消错误异常处理程序中插入等待,以使您的程序保持活动状态几秒钟。我在 Windows 上运行,这就是为什么我稍微更改了信号处理程序,但这只是一个小问题。我还向打印语句添加了时间戳。
import asyncio
import signal
import time

async def task1():
    print("Starting simulated task1", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task1", time.time())

async def task2():
    print("Starting simulated task2", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task2", time.time())

async def tasks():
    await task1()
    await task2()

async def task_loop():
    try:
        while True:
            await asyncio.shield(tasks())
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop", time.time())
        raise

async def aiomain():
    task = asyncio.create_task(task_loop())
    KillNicely(task)
    try:
        await task
    except asyncio.CancelledError:
        print("Caught CancelledError", time.time())
        await asyncio.sleep(5.0)
        raise

class KillNicely:
    def __init__(self, cancel_me):
        self.cancel_me = cancel_me
        self.old_sigint = signal.signal(signal.SIGINT,
                                        self.trap_control_c)

    def trap_control_c(self, signum, stack):
        if signum != signal.SIGINT:
            self.old_sigint(signum, stack)
        else:
            print("Got Control-C", time.time())
            print(self.cancel_me.cancel())

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        print("Program exit, cancelled", time.time())

# Output when ctrlC is struck during task1
# 
# Starting simulated task1 1590871747.8977509
# Got Control-C 1590871750.8385916
# True
# Shutting down task loop 1590871750.8425908
# Caught CancelledError 1590871750.8435903
# Finished simulated task1 1590871752.908434
# Starting simulated task2 1590871752.908434
# Program exit, cancelled 1590871755.8488846        

if __name__ == '__main__':
    main()

您可以看到,您的程序没有运行成功,因为一旦task_loop被取消,它就会退出,而task1和task2没有机会完成。 它们一直都在那里(或者更准确地说,如果程序继续运行,它们本应该在那里)。

这说明了shield()和cancel()之间的交互方式,但它并没有真正解决您所述的问题。 为此,我认为您需要一个可等待对象,以便在关键任务完成之前将程序保持活动状态。 这个对象需要在顶层创建,并向下传递到执行关键任务的地方。 下面是一个类似于您的程序,但表现出期望效果的示例程序。

我进行了三次运行:(1)在task1期间控制-C,(2)在task2期间控制-C,(3)在两个任务完成后控制-C。 在前两种情况下,程序会继续运行,直到task2完成。 在第三种情况下,程序立即结束。

import asyncio
import signal
import time

async def task1():
    print("Starting simulated task1", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task1", time.time())

async def task2():
    print("Starting simulated task2", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task2", time.time())

async def tasks(kwrap):
    fut = asyncio.get_running_loop().create_future()
    kwrap.awaitable = fut
    await task1()
    await task2()
    fut.set_result(1)

async def task_loop(kwrap):
    try:
        while True:
            await asyncio.shield(tasks(kwrap))
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop", time.time())
        raise

async def aiomain():
    kwrap = KillWrapper()
    task = asyncio.create_task(task_loop(kwrap))
    KillNicely(task)
    try:
        await task
    except asyncio.CancelledError:
        print("Caught CancelledError", time.time())
        await kwrap.awaitable
        raise

class KillNicely:
    def __init__(self, cancel_me):
        self.cancel_me = cancel_me
        self.old_sigint = signal.signal(signal.SIGINT,
                                        self.trap_control_c)

    def trap_control_c(self, signum, stack):
        if signum != signal.SIGINT:
            self.old_sigint(signum, stack)
        else:
            print("Got Control-C", time.time())
            print(self.cancel_me.cancel())

class KillWrapper:
    def __init__(self):
        self.awaitable = asyncio.get_running_loop().create_future()
        self.awaitable.set_result(0)

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        print("Program exit, cancelled", time.time())

# Run 1 Control-C during task1
# Starting simulated task1 1590872408.6737766
# Got Control-C 1590872410.7344952
# True
# Shutting down task loop 1590872410.7354996
# Caught CancelledError 1590872410.7354996
# Finished simulated task1 1590872413.6747622
# Starting simulated task2 1590872413.6747622
# Finished simulated task2 1590872418.6750958
# Program exit, cancelled 1590872418.6750958
#
# Run 1 Control-C during task2
# Starting simulated task1 1590872492.927735
# Finished simulated task1 1590872497.9280624
# Starting simulated task2 1590872497.9280624
# Got Control-C 1590872499.5973852
# True
# Shutting down task loop 1590872499.5983844
# Caught CancelledError 1590872499.5983844
# Finished simulated task2 1590872502.9274273
# Program exit, cancelled 1590872502.9287038
#
# Run 1 Control-C after task2 -> immediate exit
# Starting simulated task1 1590873694.2925708
# Finished simulated task1 1590873699.2928336
# Starting simulated task2 1590873699.2928336
# Finished simulated task2 1590873704.2938952
# Got Control-C 1590873706.0790765
# True
# Shutting down task loop 1590873706.0804725
# Caught CancelledError 1590873706.0804725
# Program exit, cancelled 1590873706.0814824

谢谢 - 我发表了自己的答案,使用了类似的想法,但是加上了一些注释,为什么在“顶层”等待最终没有起作用,所以我的解决方案将等待移到了shield周围的包装器中。 - Daniel Schepler

0

这是我最终使用的代码:

import asyncio, signal

async def _shield_and_wait_body(coro, finish_event):
    try:
        await coro
    finally:
        finish_event.set()

async def shield_and_wait(coro):
    finish_event = asyncio.Event()
    task = asyncio.shield(_shield_and_wait_body(coro, finish_event))
    try:
        await task
    except asyncio.CancelledError:
        await finish_event.wait()
        raise

def shield_and_wait_decorator(coro_fn):
    return lambda *args, **kwargs: shield_and_wait(coro_fn(*args, **kwargs))

async def task1():
    print("Starting simulated task1")
    await asyncio.sleep(5)
    print("Finished simulated task1")

async def task2():
    print("Starting simulated task2")
    await asyncio.sleep(5)
    print("Finished simulated task2")

@shield_and_wait_decorator
async def tasks():
    await task1()
    await task2()

async def task_loop():
    try:
        while True:
            # Alternative to applying @shield_and_wait_decorator to tasks()
            #await shield_and_wait(tasks())
            await tasks()
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop")
        raise

def sigint_handler(task):
    print("Cancelling task loop")
    task.cancel()

async def aiomain():
    loop = asyncio.get_running_loop()
    task = asyncio.Task(task_loop())
    loop.add_signal_handler(signal.SIGINT, sigint_handler, task)
    await task

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        pass

if __name__ == '__main__':
    main()

与Paul Cornelius的答案类似,这会在子任务完成之前插入等待,然后才允许CancelledError向上传播到调用链。但是,除了在调用asyncio.shield的点之外,它不需要触及代码。
(在我的实际用例中,我同时运行了三个循环,使用asyncio.Lock确保一个任务或一系列任务在另一个任务开始之前完成。我还在该锁上使用了asyncio.Condition来从一个协程通信到另一个协程。当我尝试在aiomainmain中等待所有被保护任务完成时,我遇到了一个问题,即取消的父进程释放了锁,然后一个被保护的任务尝试使用该锁来发出条件变量信号,导致错误。将获取和释放锁移动到被保护的任务中也没有意义 - 这将导致任务B仍按顺序运行:被保护的任务A启动,任务B的协程过期并阻塞等待锁,Control+C。另一方面,在shield_and_wait调用点处等待可以整洁地避免过早释放锁。)
一个注意点:似乎shield_and_wait_decorator在类方法上无法正常工作。

棘手的问题。你的解决方案看起来很好,我喜欢装饰器的想法。我唯一能看到的显著区别是,在你的解决方案中,task_loop 直到 tasks() 完成后才“知道”取消操作。但我看不出这对应用逻辑有什么影响。在过去的两年里,我写了几个中等规模的 asyncio 应用程序,但我从未使用过 shield。现在我知道它的作用了。感谢你的挑战。 - Paul Cornelius
现在面临的下一个挑战是想出一种方法,使SIGTERM作为“更强烈的关闭请求”起作用,以便可以终止某些长时间运行的子进程。顺便说一句,如果有人对实际用例感到好奇:https://github.com/dschepler/microbuildd。 - Daniel Schepler
我在考虑大致如下:在一个受保护的例程中,可以运行 done, pending = await asyncio.wait({subsubtask, term_event})\nif term_event in done and subsubtask in pending:\n subsubtask.cancel()\n await subsubtask,然后在 asyncio 子进程对象周围创建一个包装器,在 CancelledError 上运行 subprocess.terminate()__aexit__ 处理程序。 - Daniel Schepler

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接