“Fire and forget” Python 异步/等待

204
有时候有些不关键的异步操作需要进行,但我不想等待它完成。在Tornado的协程实现中,您可以通过简单地省略yield关键字来"fire & forget"异步函数。
我一直在尝试弄清楚如何使用Python 3.5中发布的新的async/await语法进行"fire & forget"。例如,一个简化的代码片段:
async def async_foo():
    print("Do some stuff asynchronously here...")

def bar():
    async_foo()  # fire and forget "async_foo()"

bar()

然而发生的是 bar() 没有执行,取而代之的是我们获得了运行时警告:

RuntimeWarning: coroutine 'async_foo' was never awaited
  async_foo()  # fire and forget "async_foo()"

3
相关吗?https://dev59.com/wlwY5IYBdhLWcg3wRF9S 实际上,我认为这是一个重复的问题,但我不想立即进行关闭。有人可以确认一下吗? - tobias_k
8
@tobias_k,我认为这不是重复的问题。链接中的答案太笼统了,不能回答这个问题。 - Mikhail Gerasimov
3
你希望你的“主”进程是(1)永久运行?还是(2)允许进程停止,但让被遗忘的任务继续执行?或者(3)你希望在结束前主进程等待被遗忘的任务完成后再结束? - Julien Palard
6个回答

308

更新:

如果您正在使用Python >= 3.7,请将asyncio.ensure_future替换为asyncio.create_task,这是一种更新、更好的生成任务的方式。


asyncio.Task实现“启动并忘记”

根据Python文档中asyncio.Task的介绍,可以启动一些协程在后台执行。由asyncio.ensure_future创建的任务不会阻塞执行(因此函数将立即返回!)。这看起来像是实现“启动并忘记”的一种方法,正如您所要求的那样。

import asyncio


async def async_foo():
    print("async_foo started")
    await asyncio.sleep(1)
    print("async_foo done")


async def main():
    asyncio.ensure_future(async_foo())  # fire and forget async_foo()

    # btw, you can also create tasks inside non-async funcs

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

Do some actions 1
async_foo started
Do some actions 2
async_foo done
Do some actions 3

如果任务在事件循环完成后执行怎么办?

请注意,asyncio希望任务在事件循环完成时被完成。因此,如果您将main()更改为:

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')

您将在程序完成后收到此警告:
Task was destroyed but it is pending!
task: <Task pending coro=<async_foo() running at [...]

为了防止这种情况,你可以在事件循环完成后只需等待所有未完成任务即可。
async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    
    # Let's also finish all running tasks:
    pending = asyncio.Task.all_tasks()
    loop.run_until_complete(asyncio.gather(*pending))

杀死任务而不是等待它们完成

有时您不想等待任务完成(例如,某些任务可能被创建为永久运行)。在这种情况下,您可以取消它们而不是等待它们:cancel()

import asyncio
from contextlib import suppress


async def echo_forever():
    while True:
        print("echo")
        await asyncio.sleep(1)


async def main():
    asyncio.ensure_future(echo_forever())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # Let's also cancel all running tasks:
    pending = asyncio.Task.all_tasks()
    for task in pending:
        task.cancel()
        # Now we should await task to execute it's cancellation.
        # Cancelled task raises asyncio.CancelledError that we can suppress:
        with suppress(asyncio.CancelledError):
            loop.run_until_complete(task)

输出:

Do some actions 1
echo
Do some actions 2
echo
Do some actions 3
echo

4
这个语法只能在Python 3.5以上版本中使用。Python 3.4需要使用旧的语法(请参考https://docs.python.org/3.4/library/asyncio-task.html)。Python 3.3及以下版本根本不支持asyncio。 - Mikhail Gerasimov
1
@Sardathrion 我不确定任务是否指向创建它的线程,但你完全可以手动跟踪它们:例如,只需将在线程中创建的所有任务添加到列表中,当时间到来时,按照上面所述的方式取消它们。 - Mikhail Gerasimov
2
很好的答案。只是想指出,大多数情况下,等待所有挂起的任务完成或取消所有挂起的任务都不是理想的选择。我认为需要一种中间方式,最好是将您想要在关闭事件循环后继续执行的任务收集到一个“注册表”(列表、集合或其他内容)中,然后只需等待这些任务完成并取消所有其他任务。 - Torsten Engelbrecht
12
请注意:"Task.all_tasks()"自Python 3.7起已被弃用,请使用"asyncio.all_tasks()"代替。 - Alexis
1
如果要取消多个任务,最好将它们聚集在一起,而不是按顺序取消和等待每个任务:for task in tasks: task.cancel() 然后 await asyncio.gather(tasks, return_exceptions=True)return_exceptions=True 确保 CancelledError 被抑制。 - Anakhand
显示剩余9条评论

32

输出:

>>> Hello
>>> foo() started
>>> I didn't wait for foo()
>>> foo() completed

这里是一个简单的装饰器函数,它可以将执行推到后台,并使控制线移动到代码的下一行。主要优点是,您不必将函数声明为await
import asyncio
import time

def fire_and_forget(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, *kwargs)

    return wrapped

@fire_and_forget
def foo():
    print("foo() started")
    time.sleep(1)
    print("foo() completed")

print("Hello")
foo()
print("I didn't wait for foo()")

注意:请查看我的其他 答案,它使用纯 thread 而不是 asyncio 实现相同的功能。

2
我在使用这种方法创建每秒约5个小的fire-and-forget任务后,经历了显著的减速。不要在生产环境中用于长时间运行的任务。它会消耗你的CPU和内存! - pir
1
Django做得很好。不需要Celery等。我用它快速返回服务器响应给客户端请求,然后执行其他必要的操作,这些操作不依赖于服务器响应。包括使用Django ORM的操作,就像正常的执行流程一样。 - Роман Арсеньев
请注意,这仅适用于主线程;如果您尝试在另一个线程上执行此操作(至少对于Python 3.6和3.9),asyncio.get_event_loop()会引发RuntimeError异常。例如:threading.Thread(target=lambda: asyncio.get_event_loop()).start() 进行测试。 - Tom
你不需要在另一个线程中运行这个。在主线程中声明装饰器,然后在任何你想要的地方使用它。 - nehem
运行大量的线程,不好。 - Erik Aronesty
显示剩余2条评论

13

这并不完全是异步执行,但也许run_in_executor()适合您。

def fire_and_forget(task, *args, **kwargs):
    loop = asyncio.get_event_loop()
    if callable(task):
        return loop.run_in_executor(None, task, *args, **kwargs)
    else:    
        raise TypeError('Task must be a callable')

def foo():
    #asynchronous stuff here


fire_and_forget(foo)

8
简洁明了的答案。值得注意的是,默认情况下executor会调用concurrent.futures.ThreadPoolExecutor.submit()。我提到这点是因为创建线程并非免费的;每秒火速启动1000个线程可能会给线程管理带来巨大压力。 - Brad Solomon
1
是的。我没有听从你的警告,使用这种方法创建每秒约5个小的fire-and-forget任务后,经历了显着的减速。不要在生产环境中使用它来运行长时间的任务。它会消耗你的CPU和内存! - pir
在这种情况下,使用进程执行器会更好吗?@BradSolomon - Rami Awar

11

如果由于某种原因无法使用 asyncio,则可以使用纯线程来实现。请查看我的其他答案和Sergey的答案。

import threading, time

def fire_and_forget(f):
    def wrapped():
        threading.Thread(target=f).start()

    return wrapped

@fire_and_forget
def foo():
    print("foo() started")
    time.sleep(1)
    print("foo() completed")

print("Hello")
foo()
print("I didn't wait for foo()")

生产

>>> Hello
>>> foo() started
>>> I didn't wait for foo()
>>> foo() completed

3
如果我们只需要asyncio中的“fire_and_forget”功能,而不需要其他功能,是否仍然最好使用asyncio?那有哪些好处呢? - pir
@pir 的好处是,你可以在 Web 服务器中拥有一个异步函数,并且有一个你想要在后台运行的任务,但尽快返回该函数。例如,API 路由连接到 DB(异步客户端),然后进行调用,然后关闭与 DB 的连接(异步关闭)。我不想等待 DB 连接关闭,我只想让它最终关闭。我更喜欢尽快返回 API 调用。这是 asyncio.ensure_future(client.close()) 的完美用例。快速执行并忘记它,然后返回 API 响应。 - Rami Awar
这里的问题是你无法保证线程不会失控。使用asyncio,它们只是被“调度”,但主线程最终将处理foo函数。该答案每次调用函数时都会随意生成线程,并且没有捕获问题的方法。这可能看起来类似于@nehem的另一个答案,但在内部功能上却非常不同。 - Jamie Marshall

0
有一个未指定的终止问题,因为“fire and forget”没有说明活动必须在何时完成(最终是否在程序终止时挂起或终止它们)。解决方案是使用上下文管理器。Python 3.11现在有了TaskGroup
一个早期的替代方案是aiowire包。它的上下文管理器有一个超时选项,并使用“trampoline”设计,允许异步函数返回异步函数。这样既避免了后台线程,也避免了无限的异步调用链。

-4
def fire_and_forget(f):
    def wrapped(*args, **kwargs):
        threading.Thread(target=functools.partial(f, *args, **kwargs)).start()

    return wrapped

这是比上面更好的版本 -- 不使用asyncio


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接