asyncio CancelledError和KeyboardInterrupt问题

9
我正在尝试两种方式来停止无限循环的运行:
- supervisor_1: 通过编程方式取消任务 - supervisor_2: 使用 Ctrl+C 终止任务
虽然在中断时supervisor_2不会抛出任何错误,但我无法防止supervisor_1报错Task was destroyed but it is pending!。你有任何想法吗?
以下是代码:
import asyncio
import aioredis
from functools import partial



class Listener:
    def __init__(self, redis_conn):
        self.redis_conn = redis_conn

    async def forever(self, loop_name):
        counter = 0
        try:
            while True:
                print('{}: {}'.format(loop_name, counter))
                counter += 1
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            print('Task Cancelled')
            self.redis_conn.close()
            await self.redis_conn.wait_closed()


async def supervisor_1(redis_conn):
    redis_conn = await redis_conn

    l = Listener(redis_conn)

    task = asyncio.ensure_future(
        asyncio.gather(l.forever('loop_1'), 
                       l.forever('loop_2')))
    await asyncio.sleep(2)
    task.cancel()


async def supervisor_2(redis_conn):
    redis_conn = await redis_conn

    l = Listener(redis_conn)
    await asyncio.gather(l.forever('loop_1'), 
                         l.forever('loop_2'))


if __name__ == '__main__':
    redis_conn = aioredis.create_pool(('localhost', 5003), db=1)

    loop = asyncio.get_event_loop()
    run = partial(supervisor_2, redis_conn=redis_conn)
    task = asyncio.ensure_future(run())
    try:
        loop.run_until_complete(task)
    except KeyboardInterrupt:
        print('Interruped !')
        task.cancel()
        loop.run_forever()
    finally:
        loop.close()

@更新:

感谢@Gerasimov,这里是修复问题的版本,但有时在按下键盘中断时仍会引发错误:

async def supervisor(redis_conn):
    redis_conn = await redis_conn

    l = Listener(redis_conn)

    task = asyncio.ensure_future(
        asyncio.gather(l.forever('loop_1'), 
                       l.forever('loop_2'))
    )
    await asyncio.sleep(10)
    task.cancel()
    with suppress(asyncio.CancelledError):
        await task

async def kill_tasks():
    pending = asyncio.Task.all_tasks()
    for task in pending:
        task.cancel()
        with suppress(asyncio.CancelledError):
            await task 

并且

if __name__ == '__main__':
    redis_conn = aioredis.create_pool(('localhost', 5003), db=1)

    loop = asyncio.get_event_loop()
    run = partial(supervisor, redis_conn=redis_conn)
    task = asyncio.ensure_future(run())
    try:
        loop.run_until_complete(task)
    except KeyboardInterrupt:
        print('Interruped !')
        loop.run_until_complete(kill_tasks())
    finally:
        loop.close()
1个回答

12
task.cancel()本身并不会结束任务:它只是告诉任务在内部引发CancelledError,然后立即返回。您应该调用它并等待,直到任务被实际取消(当它引发CancelledError时)。
您还不应该在任务中抑制CancelledError
请阅读此答案,其中我试图展示使用任务的不同方式。例如,要取消某个任务并等待其被取消,可以执行以下操作:
from contextlib import suppress


task = ...  # remember, task doesn't suppress CancelledError itself

task.cancel()  # returns immediately, we should await task raised CancelledError.

with suppress(asyncio.CancelledError):
    await task  # or loop.run_until_complete(task) if it happens after event loop stopped

# Now when we awaited for CancelledError and handled it, 
# task is finally over and we can close event loop without warning.

谢谢提供链接。我根据我理解的修复方法更新了我的答案。但是仍然出现错误(虽然不像以前那样频繁)。 - Orelus
1
@Orelus,和之前一样的错误吗?尝试将loop.run_until_complete(kill_tasks())移动到finally块中,在loop.close()之前。这应该可以解决问题。我不确定你的run()协程做了什么,但是可能会出现这种情况:当它完成时,但某些任务没有完成:在关闭事件循环时,即使没有发生KeyboardInterrupt,也会收到警告。 - Mikhail Gerasimov
请注意使用此方法的一个缺点:如果调用任务在等待子任务清理并退出时被取消,suppress() 调用将会吞噬 那个 CancelledError,实际上,await task 可能会被提前中止。看起来当前的 asyncio(至少在 3.8 版本)可能没有办法避免这种情况。 - Peter Hansen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接