如何使用asyncio安排和取消任务

14

我正在编写一个客户端-服务器应用程序。当连接时,客户端向服务器发送一个 "心跳" 信号,例如每秒钟一次。 在服务器端,我需要一个机制,可以添加要异步执行的任务(或协程或其他内容)。此外,当客户端停止发送“心跳”信号时,我想从客户端取消任务。

换句话说,当服务器启动任务时,它具有超时时间或 ttl,例如 3 秒钟。当服务器收到“心跳”信号时,它会重新设置计时器,以便再等待 3 秒钟,直到任务完成或客户端断开连接(停止发送信号)。

这是一个关于取消来自 asyncio 在 pymotw.com 上的教程中任务的示例。但是这里的任务被取消之前,事件循环尚未开始,这对我不太适合。

import asyncio

async def task_func():
    print('in task_func')
    return 'the result'


event_loop = asyncio.get_event_loop()
try:
    print('creating task')
    task = event_loop.create_task(task_func())

    print('canceling task')
    task.cancel()

    print('entering event loop')
    event_loop.run_until_complete(task)
    print('task: {!r}'.format(task))
except asyncio.CancelledError:
    print('caught error from cancelled task')
else:
    print('task result: {!r}'.format(task.result()))
finally:
    event_loop.close()

你是否不喜欢使用像 Celery 这样的工具来进行异步处理? - Keith Bailey
@KeithBailey 实际上,Celery 可能会有所帮助;但如果有一个使用 asyncio 的解决方案,那将会更好,因为它是标准库的一部分,而不是第三方库。 - Sergey Belash
我大概知道如何在celery中实现它,在asyncio中看起来可以使用队列类型来完成(有一个asyncio和一个multiprocessing的队列类型)。这使您可以定义队列和消费者,然后从队列中删除项目。但我不认为它会让你停止正在运行的任务。 - Keith Bailey
在Asyncio中有一个任务包装器,可以通过ensure_future()方法调用。它还具有一个取消方法。 - songololo
@shongololo 我知道这个链接:https://pymotw.com/3/asyncio/tasks.html#creating-tasks-from-coroutines 但是如果事件循环已经在运行,我不知道如何使用它。你能提供一个例子吗? - Sergey Belash
@SergeyBelash 发布了一个通用答案,但请注意我还没有测试过。 - songololo
2个回答

26
您可以使用 asyncioTask 包装器通过 ensure_future() 方法执行任务。 ensure_future 会自动将协程包装在一个 Task 包装器中,并将其附加到您的事件循环中。然后,Task 包装器也会确保协程从 awaitawait 语句(或直到协程完成)。
换句话说,只需将常规协程传递给 ensure_future 并将结果的 Task 对象分配给变量即可。随时可以调用 Task.cancel() 停止执行该任务。
import asyncio

async def task_func():
    print('in task_func')
    # if the task needs to run for a while you'll need an await statement
    # to provide a pause point so that other coroutines can run in the mean time
    await some_db_or_long_running_background_coroutine()
    # or if this is a once-off thing, then return the result,
    # but then you don't really need a Task wrapper...
    # return 'the result'

async def my_app():
    my_task = None
    while True:
        await asyncio.sleep(0)

        # listen for trigger / heartbeat
        if heartbeat and my_task is None:
            my_task = asyncio.ensure_future(task_func())

        # also listen for termination of hearbeat / connection
        elif not heartbeat and my_task:
            if not my_task.cancelled():
                my_task.cancel()
            else:
                my_task = None

run_app = asyncio.ensure_future(my_app())
event_loop = asyncio.get_event_loop()
event_loop.run_forever()

请注意,任务是为需要在后台持续工作而不中断主要流程的长时间运行任务而设计的。如果您只需要快速一次性方法,则直接调用函数即可。

4
自Python 3.7起,应使用asyncio.create_task() - iFreilicht

0
你可以给每个任务传递一个取消令牌,然后调用该令牌的cancel()方法。
这个模板是在库中实现的。使用以下命令安装它:
pip install cantok

文档中了解更多关于该图书馆的信息。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接