如何使用asyncio定期执行一个函数?

109

我正在从tornado迁移到asyncio,但我找不到asynciotornadoPeriodicCallback等价物。(一个PeriodicCallback需要两个参数: 要运行的函数和调用之间的毫秒数)

  • asyncio中是否有这样的等价物?
  • 如果没有,最干净的方法是什么,可以避免在一段时间后出现RecursionError

https://gist.github.com/hirokiky/f4dae78b6d637f078e1c - Arunmu
为什么需要从tornado迁移?它们不能一起工作吗?http://www.tornadoweb.org/en/stable/asyncio.html - OneCricketeer
只需在您的函数中添加 await asyncio.sleep(time) - songololo
与Twisted相同,没有“LoopingCall”实现。 - zgoda
9个回答

97

对于Python版本低于3.5:

import asyncio

@asyncio.coroutine
def periodic():
    while True:
        print('periodic')
        yield from asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

对于 Python 3.5 及以上版本:

import asyncio

async def periodic():
    while True:
        print('periodic')
        await asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

6
即使在龙卷风中,我建议对于使用协程的应用程序,使用类似这样的循环而不是PeriodicCallback - Ben Darnell
9
简要说明: 不要直接创建Task实例;使用ensure_future()函数或AbstractEventLoop.create_task()方法。来自asyncio文档 - Torkel Bjørnson-Langen
30
或者您可以直接调用loop.call_later(5, task.cancel) - ReWrite
5
关于Python 3.7的一点说明:根据asyncio文档,我们应该使用高级别的asyncio.create_task()来创建Task - mhchia
我这边出现了一个错误 'RuntimeError: 这个事件循环已经在运行中' - undefined
显示剩余3条评论

39

当您感觉在asyncio程序的“后台”中应该发生某些事情时,asyncio.Task可能是一个很好的方式来实现它。您可以阅读此帖子以了解如何使用任务。

这里是一个可能的类实现,用于定期执行某个函数:

import asyncio
from contextlib import suppress


class Periodic:
    def __init__(self, func, time):
        self.func = func
        self.time = time
        self.is_started = False
        self._task = None

    async def start(self):
        if not self.is_started:
            self.is_started = True
            # Start task to call func periodically:
            self._task = asyncio.ensure_future(self._run())

    async def stop(self):
        if self.is_started:
            self.is_started = False
            # Stop task and await it stopped:
            self._task.cancel()
            with suppress(asyncio.CancelledError):
                await self._task

    async def _run(self):
        while True:
            await asyncio.sleep(self.time)
            self.func()

让我们来测试一下:

async def main():
    p = Periodic(lambda: print('test'), 1)
    try:
        print('Start')
        await p.start()
        await asyncio.sleep(3.1)

        print('Stop')
        await p.stop()
        await asyncio.sleep(3.1)

        print('Start')
        await p.start()
        await asyncio.sleep(3.1)
    finally:
        await p.stop()  # we should stop task finally


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

Start
test
test
test

Stop

Start
test
test
test

[Finished in 9.5s]

正如您在start中看到的那样,我们只是启动调用一些函数并在无限循环中睡眠一段时间的任务。在stop上,我们只是取消该任务。请注意,该任务应在程序完成时停止。

还有一件重要的事情是,您的回调函数不应花费太多时间被执行(否则它会冻结您的事件循环)。如果您计划调用某个长时间运行的func,可能需要在执行器中运行它


目前为止最完整和清晰的回答!谢谢。 把func作为协程来要求是一个好主意吗?这样我们就可以在_run方法中使用await self.func() - Sergey Belash
1
@SergeyBelash,没问题。请注意,由于我们在随机时间取消任务,因此您的函数也可能会在随机时间被取消。这意味着您函数内的每个await行都有可能引发CancelledError异常。但这对于所有异步函数来说都是实际情况(就像在常规非异步代码中可以随机引发KeyboardInterrupt一样)。 - Mikhail Gerasimov
我对此(以及其他答案)感到担忧,因为重复率可能不会完全等于时间值。如果函数需要相当长的时间来执行,它甚至都不会接近,并且在长时间内即使函数需要可忽略的时间,也会出现漂移。 - Ian Goldby
严格来说,start() 不需要是 async - fgiraldeau
这可以升级以支持普通函数和异步函数: async def _run(self): while True: await asyncio.sleep(self.time) # 支持普通函数和异步函数 res = self.func() if inspect.isawaitable(res): await res - Airstriker

34

一个可能有帮助的变体:如果你想让定期调用每n秒发生一次,而不是在上一次执行结束和下一次开始之间间隔n秒,并且你不想让调用在时间上重叠,则以下方法更简单:

async def repeat(interval, func, *args, **kwargs):
    """Run func every interval seconds.

    If func has not finished before *interval*, will run again
    immediately when the previous iteration finished.

    *args and **kwargs are passed as the arguments to func.
    """
    while True:
        await asyncio.gather(
            func(*args, **kwargs),
            asyncio.sleep(interval),
        )

以下是使用它在后台运行几个任务的示例:

async def f():
    await asyncio.sleep(1)
    print('Hello')


async def g():
    await asyncio.sleep(0.5)
    print('Goodbye')


async def main():
    t1 = asyncio.ensure_future(repeat(3, f))
    t2 = asyncio.ensure_future(repeat(2, g))
    await t1
    await t2

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

谢谢!在我的服务器负载很重的时候,我遇到了这个问题,而且经过多次重复,我们开始出现了时钟偏差。这个方法优雅地解决了这个问题。 - Christian Oudard
2
为什么在主函数中使用ensure_future?为什么不直接使用await repeat(3, f)await repeat(2, g)呢? - marcoc88
如果您想让函数f或g返回一个值怎么办? - eugene
@marcoc88 如果你只是等待重复,剩下的代码将在第一个重复结束后执行。 - Burak Yildiz
@eugene 你可以通过将额外的回调函数传递给 repeat 函数来完成。每次 asyncio.gather 完成后,它会返回一个返回值列表。列表的第一个元素将是 func 的返回值。然后你可以简单地使用该值调用回调函数。 - Burak Yildiz

29

没有内置支持定期调用的方法。

只需创建自己的调度程序循环,让它休眠并执行任何已安排的任务:

import math, time

async def scheduler():
    while True:
        # sleep until the next whole second
        now = time.time()
        await asyncio.sleep(math.ceil(now) - now)
       
        # execute any scheduled tasks
        async for task in scheduled_tasks(time.time()):
            await task()

scheduled_tasks()迭代器应该生成在给定时间准备运行的任务。请注意,产生计划并启动所有任务在理论上可能需要超过1秒的时间;这里的想法是调度程序生成自上次检查以来应该已经开始的所有任务。


4
@krs013: 那是一个不同的时钟;它不一定提供真实世界的时间(这取决于事件循环实现,可以测量CPU时间滴答声或另一个单调递增的时钟度量)。因为它不能保证以秒为单位提供测量值,所以在这里不应该使用它。 - Martijn Pieters
哦,好点,谢谢。我想这对于间隔计时来说已经足够了,但是似乎没有保证线程睡眠的准确性。我看到的实现似乎只是使用机器的纳秒运行时间,但是没错,你是对的。我现在想修复一些代码... - krs013
loop.time() 方法的 docstring 表示:“这是自纪元以来以秒为单位表示的浮点数,但是纪元、精度、准确性和漂移未指定,并且可能因事件循环而异。” 在这里,我解释为“SI 自纪元以来的秒数”,因此 CPU 时间片或其他非“均匀”时钟不能作为 loop.time() 的有效选择。由于 OP 仅要求每x毫秒进行定期回调,因此我认为 loop.time() 是合适的选择。 - Stefano M
@StefanoM:是的,它可能足够好,但这取决于事件循环实现,并且文档字符串为实现提供了充分的自由度。对于重复任务,它可能已经足够好了,但我的答案描述了一个调度程序,通常需要执行类似cron的操作(例如,在特定的真实时间运行任务)。 - Martijn Pieters
@ChristianOudard:同样的回答也适用于您:time.monotic()非常适合测量经过的时间,但不适合根据挂钟时间值安排项目。 - Martijn Pieters
显示剩余2条评论

13

使用装饰器实现的Python 3.7替代版本

import asyncio
import time


def periodic(period):
    def scheduler(fcn):

        async def wrapper(*args, **kwargs):

            while True:
                asyncio.create_task(fcn(*args, **kwargs))
                await asyncio.sleep(period)

        return wrapper

    return scheduler


@periodic(2)
async def do_something(*args, **kwargs):
    await asyncio.sleep(5)  # Do some heavy calculation
    print(time.time())


if __name__ == '__main__':
    asyncio.run(do_something('Maluzinha do papai!', secret=42))

6

根据@A. Jesse Jiryu Davis的回答 (结合@Torkel Bjørnson-Langen和@ReWrite的评论),这是一种避免漂移的改进方法。

import time
import asyncio

@asyncio.coroutine
def periodic(period):
    def g_tick():
        t = time.time()
        count = 0
        while True:
            count += 1
            yield max(t + count * period - time.time(), 0)
    g = g_tick()

    while True:
        print('periodic', time.time())
        yield from asyncio.sleep(next(g))

loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

2
“periodic” 应该优先使用 loop.time() 而不是 time.time(),因为 loop.time()asyncio.sleep() 内部使用的时间参考。loop.time() 返回单调时间,而 time.time() 返回挂钟时间。当系统管理员修改系统日期或 NTP 调整挂钟时间时,两者将有所不同。 - user4815162342
这个答案需要更新到Python 3.10版本,我不认为在新的Python版本中有人还使用@asyncio.coroutine或loop.run_until_complete。 - PirateApp
@user4815162342 你能告诉我这个 loop.time 方法在哪里吗?我尝试搜索了一下,但没有搜到信息。我还尝试从 asyncio 中导入 loop 和从 asyncio.loop 中导入 time,但这些都不管用。 - PirateApp
1
@PirateApp 使用loop = asyncio.get_event_loop() - user4815162342

2
这个解决方案使用了装饰器概念,来自Fernando José Esteves de Souza,漂移工作区来自Wojciech Migda,并且使用一个超类来生成最优雅的代码,以应对异步周期性函数。

没有使用threading.Thread

该解决方案由以下文件组成:
  • periodic_async_thread.py,其中包含用于子类化的基类
  • a_periodic_thread.py,其中包含一个示例子类
  • run_me.py,其中包含一个示例实例化和运行
在文件periodic_async_thread.py中,PeriodicAsyncThread类:
import time
import asyncio
import abc

class PeriodicAsyncThread:
    def __init__(self, period):
        self.period = period

    def periodic(self):
        def scheduler(fcn):
            async def wrapper(*args, **kwargs):
                def g_tick():
                    t = time.time()
                    count = 0
                    while True:
                        count += 1
                        yield max(t + count * self.period - time.time(), 0)
                g = g_tick()

                while True:
                    # print('periodic', time.time())
                    asyncio.create_task(fcn(*args, **kwargs))
                    await asyncio.sleep(next(g))
            return wrapper
        return scheduler

    @abc.abstractmethod
    async def run(self, *args, **kwargs):
        return

    def start(self):
        asyncio.run(self.run())

一个简单的子类APeriodicThread的示例在文件a_periodic_thread.py中:

from periodic_async_thread import PeriodicAsyncThread
import time
import asyncio

class APeriodicThread(PeriodicAsyncThread):
    def __init__(self, period):
        super().__init__(period)
        self.run = self.periodic()(self.run)
    
    async def run(self, *args, **kwargs):
        await asyncio.sleep(2)
        print(time.time())

在文件run_me.py中实例化并运行示例类:

from a_periodic_thread import APeriodicThread
apt = APeriodicThread(2)
apt.start()

这段代码提供了一种优雅的解决方案,同时也缓解了其他解决方案中的时间漂移问题。输出结果类似于:

1642711285.3898764
1642711287.390698
1642711289.3924973
1642711291.3920736

使用 threading.Thread

解决方案由以下文件组成:

  • async_thread.py:包含 Canopy 异步线程类。
  • periodic_async_thread.py:包含基类,供您进行子类化。
  • a_periodic_thread.py:包含示例子类。
  • run_me.py:包含实例化和运行的示例。

在文件 async_thread.py 中的 AsyncThread 类:

from threading import Thread
import asyncio
import abc

class AsyncThread(Thread):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)

    @abc.abstractmethod
    async def async_run(self, *args, **kwargs):
        pass

    def run(self, *args, **kwargs):
        # loop = asyncio.new_event_loop()
        # asyncio.set_event_loop(loop)

        # loop.run_until_complete(self.async_run(*args, **kwargs))
        # loop.close()
        asyncio.run(self.async_run(*args, **kwargs))

PeriodicAsyncThread类位于periodic_async_thread.py文件中:

import time
import asyncio
from .async_thread import AsyncThread

class PeriodicAsyncThread(AsyncThread):
    def __init__(self, period, *args, **kwargs):
        self.period = period
        super().__init__(*args, **kwargs)
        self.async_run = self.periodic()(self.async_run)

    def periodic(self):
        def scheduler(fcn):
            async def wrapper(*args, **kwargs):
                def g_tick():
                    t = time.time()
                    count = 0
                    while True:
                        count += 1
                        yield max(t + count * self.period - time.time(), 0)
                g = g_tick()

                while True:
                    # print('periodic', time.time())
                    asyncio.create_task(fcn(*args, **kwargs))
                    await asyncio.sleep(next(g))
            return wrapper
        return scheduler

一个简单的子类APeriodicThread的示例,在文件a_periodic_thread.py中:

import time
from threading import current_thread
from .periodic_async_thread import PeriodicAsyncThread
import asyncio

class APeriodicAsyncTHread(PeriodicAsyncThread):
    async def async_run(self, *args, **kwargs):
        print(f"{current_thread().name} {time.time()} Hi!")
        await asyncio.sleep(1)
        print(f"{current_thread().name} {time.time()} Bye!")

在文件run_me.py中实例化并运行示例类:

from .a_periodic_thread import APeriodicAsyncTHread
a = APeriodicAsyncTHread(2, name = "a periodic async thread")
a.start()
a.join()

这段代码代表了一种优雅的解决方案,同时还缓解了其他解决方案中的时间漂移问题。输出结果类似于:

a periodic async thread 1643726990.505269 Hi!
a periodic async thread 1643726991.5069854 Bye!
a periodic async thread 1643726992.506919 Hi!
a periodic async thread 1643726993.5089169 Bye!
a periodic async thread 1643726994.5076022 Hi!
a periodic async thread 1643726995.509422 Bye!
a periodic async thread 1643726996.5075526 Hi!
a periodic async thread 1643726997.5093904 Bye!
a periodic async thread 1643726998.5072556 Hi!
a periodic async thread 1643726999.5091035 Bye!

如果计数无限增加,你不会遇到溢出吗? - PirateApp

1

对于多种类型的调度,我建议使用APSScheduler,该工具支持asyncio。

我将它用于一个简单的Python进程,可以使用docker启动并像cron一样每周执行某些操作,直到我停止docker/process。


0
这是我用asyncio测试周期性回调理论的方法。我没有使用Tornado的经验,所以不确定它如何处理周期性回调。我习惯于在Tkinter中使用after(ms, callback)方法,这就是我想出来的方法。对我来说,While True:看起来很丑陋,即使它是异步的(比全局变量更丑)。call_later(s, callback, *args)方法使用秒而不是毫秒。
import asyncio
my_var = 0
def update_forever(the_loop):
    global my_var
    print(my_var)
    my_var += 1 
    # exit logic could be placed here
    the_loop.call_later(3, update_forever, the_loop)  # the method adds a delayed callback on completion

event_loop = asyncio.get_event_loop()
event_loop.call_soon(update_forever, event_loop)
event_loop.run_forever()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接