我正在从tornado
迁移到asyncio
,但我找不到asyncio
中tornado
的PeriodicCallback
等价物。(一个PeriodicCallback
需要两个参数: 要运行的函数和调用之间的毫秒数)
asyncio
中是否有这样的等价物?- 如果没有,最干净的方法是什么,可以避免在一段时间后出现
RecursionError
?
我正在从tornado
迁移到asyncio
,但我找不到asyncio
中tornado
的PeriodicCallback
等价物。(一个PeriodicCallback
需要两个参数: 要运行的函数和调用之间的毫秒数)
asyncio
中是否有这样的等价物?RecursionError
?对于Python版本低于3.5:
import asyncio
@asyncio.coroutine
def periodic():
while True:
print('periodic')
yield from asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
对于 Python 3.5 及以上版本:
import asyncio
async def periodic():
while True:
print('periodic')
await asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
PeriodicCallback
。 - Ben DarnellTask
实例;使用ensure_future()
函数或AbstractEventLoop.create_task()
方法。来自asyncio文档。 - Torkel Bjørnson-Langenloop.call_later(5, task.cancel)
。 - ReWrite当您感觉在asyncio程序的“后台”中应该发生某些事情时,asyncio.Task
可能是一个很好的方式来实现它。您可以阅读此帖子以了解如何使用任务。
这里是一个可能的类实现,用于定期执行某个函数:
import asyncio
from contextlib import suppress
class Periodic:
def __init__(self, func, time):
self.func = func
self.time = time
self.is_started = False
self._task = None
async def start(self):
if not self.is_started:
self.is_started = True
# Start task to call func periodically:
self._task = asyncio.ensure_future(self._run())
async def stop(self):
if self.is_started:
self.is_started = False
# Stop task and await it stopped:
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
async def _run(self):
while True:
await asyncio.sleep(self.time)
self.func()
让我们来测试一下:
async def main():
p = Periodic(lambda: print('test'), 1)
try:
print('Start')
await p.start()
await asyncio.sleep(3.1)
print('Stop')
await p.stop()
await asyncio.sleep(3.1)
print('Start')
await p.start()
await asyncio.sleep(3.1)
finally:
await p.stop() # we should stop task finally
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出:
Start
test
test
test
Stop
Start
test
test
test
[Finished in 9.5s]
正如您在start
中看到的那样,我们只是启动调用一些函数并在无限循环中睡眠一段时间的任务。在stop
上,我们只是取消该任务。请注意,该任务应在程序完成时停止。
还有一件重要的事情是,您的回调函数不应花费太多时间被执行(否则它会冻结您的事件循环)。如果您计划调用某个长时间运行的func
,可能需要在执行器中运行它。
func
作为协程来要求是一个好主意吗?这样我们就可以在_run
方法中使用await self.func()
。 - Sergey Belashstart()
不需要是 async
。 - fgiraldeau async def _run(self):
while True:
await asyncio.sleep(self.time)
# 支持普通函数和异步函数
res = self.func()
if inspect.isawaitable(res):
await res
- Airstriker一个可能有帮助的变体:如果你想让定期调用每n秒发生一次,而不是在上一次执行结束和下一次开始之间间隔n秒,并且你不想让调用在时间上重叠,则以下方法更简单:
async def repeat(interval, func, *args, **kwargs):
"""Run func every interval seconds.
If func has not finished before *interval*, will run again
immediately when the previous iteration finished.
*args and **kwargs are passed as the arguments to func.
"""
while True:
await asyncio.gather(
func(*args, **kwargs),
asyncio.sleep(interval),
)
以下是使用它在后台运行几个任务的示例:
async def f():
await asyncio.sleep(1)
print('Hello')
async def g():
await asyncio.sleep(0.5)
print('Goodbye')
async def main():
t1 = asyncio.ensure_future(repeat(3, f))
t2 = asyncio.ensure_future(repeat(2, g))
await t1
await t2
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
await repeat(3, f)
和await repeat(2, g)
呢? - marcoc88没有内置支持定期调用的方法。
只需创建自己的调度程序循环,让它休眠并执行任何已安排的任务:
import math, time
async def scheduler():
while True:
# sleep until the next whole second
now = time.time()
await asyncio.sleep(math.ceil(now) - now)
# execute any scheduled tasks
async for task in scheduled_tasks(time.time()):
await task()
scheduled_tasks()
迭代器应该生成在给定时间准备运行的任务。请注意,产生计划并启动所有任务在理论上可能需要超过1秒的时间;这里的想法是调度程序生成自上次检查以来应该已经开始的所有任务。
loop.time()
方法的 docstring 表示:“这是自纪元以来以秒为单位表示的浮点数,但是纪元、精度、准确性和漂移未指定,并且可能因事件循环而异。” 在这里,我解释为“SI 自纪元以来的秒数”,因此 CPU 时间片或其他非“均匀”时钟不能作为 loop.time()
的有效选择。由于 OP 仅要求每x毫秒进行定期回调,因此我认为 loop.time()
是合适的选择。 - Stefano Mtime.monotic()
非常适合测量经过的时间,但不适合根据挂钟时间值安排项目。 - Martijn Pieters使用装饰器实现的Python 3.7替代版本
import asyncio
import time
def periodic(period):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
while True:
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(period)
return wrapper
return scheduler
@periodic(2)
async def do_something(*args, **kwargs):
await asyncio.sleep(5) # Do some heavy calculation
print(time.time())
if __name__ == '__main__':
asyncio.run(do_something('Maluzinha do papai!', secret=42))
根据@A. Jesse Jiryu Davis的回答 (结合@Torkel Bjørnson-Langen和@ReWrite的评论),这是一种避免漂移的改进方法。
import time
import asyncio
@asyncio.coroutine
def periodic(period):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * period - time.time(), 0)
g = g_tick()
while True:
print('periodic', time.time())
yield from asyncio.sleep(next(g))
loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
loop.time()
而不是 time.time()
,因为 loop.time()
是 asyncio.sleep()
内部使用的时间参考。loop.time()
返回单调时间,而 time.time()
返回挂钟时间。当系统管理员修改系统日期或 NTP 调整挂钟时间时,两者将有所不同。 - user4815162342loop = asyncio.get_event_loop()
。 - user4815162342periodic_async_thread.py
,其中包含用于子类化的基类a_periodic_thread.py
,其中包含一个示例子类run_me.py
,其中包含一个示例实例化和运行periodic_async_thread.py
中,PeriodicAsyncThread
类:import time
import asyncio
import abc
class PeriodicAsyncThread:
def __init__(self, period):
self.period = period
def periodic(self):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * self.period - time.time(), 0)
g = g_tick()
while True:
# print('periodic', time.time())
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(next(g))
return wrapper
return scheduler
@abc.abstractmethod
async def run(self, *args, **kwargs):
return
def start(self):
asyncio.run(self.run())
一个简单的子类APeriodicThread
的示例在文件a_periodic_thread.py
中:
from periodic_async_thread import PeriodicAsyncThread
import time
import asyncio
class APeriodicThread(PeriodicAsyncThread):
def __init__(self, period):
super().__init__(period)
self.run = self.periodic()(self.run)
async def run(self, *args, **kwargs):
await asyncio.sleep(2)
print(time.time())
在文件run_me.py
中实例化并运行示例类:
from a_periodic_thread import APeriodicThread
apt = APeriodicThread(2)
apt.start()
这段代码提供了一种优雅的解决方案,同时也缓解了其他解决方案中的时间漂移问题。输出结果类似于:
1642711285.3898764
1642711287.390698
1642711289.3924973
1642711291.3920736
解决方案由以下文件组成:
async_thread.py
:包含 Canopy 异步线程类。periodic_async_thread.py
:包含基类,供您进行子类化。a_periodic_thread.py
:包含示例子类。run_me.py
:包含实例化和运行的示例。在文件 async_thread.py
中的 AsyncThread
类:
from threading import Thread
import asyncio
import abc
class AsyncThread(Thread):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@abc.abstractmethod
async def async_run(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# loop.run_until_complete(self.async_run(*args, **kwargs))
# loop.close()
asyncio.run(self.async_run(*args, **kwargs))
PeriodicAsyncThread
类位于periodic_async_thread.py
文件中:
import time
import asyncio
from .async_thread import AsyncThread
class PeriodicAsyncThread(AsyncThread):
def __init__(self, period, *args, **kwargs):
self.period = period
super().__init__(*args, **kwargs)
self.async_run = self.periodic()(self.async_run)
def periodic(self):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * self.period - time.time(), 0)
g = g_tick()
while True:
# print('periodic', time.time())
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(next(g))
return wrapper
return scheduler
一个简单的子类APeriodicThread
的示例,在文件a_periodic_thread.py
中:
import time
from threading import current_thread
from .periodic_async_thread import PeriodicAsyncThread
import asyncio
class APeriodicAsyncTHread(PeriodicAsyncThread):
async def async_run(self, *args, **kwargs):
print(f"{current_thread().name} {time.time()} Hi!")
await asyncio.sleep(1)
print(f"{current_thread().name} {time.time()} Bye!")
在文件run_me.py
中实例化并运行示例类:
from .a_periodic_thread import APeriodicAsyncTHread
a = APeriodicAsyncTHread(2, name = "a periodic async thread")
a.start()
a.join()
这段代码代表了一种优雅的解决方案,同时还缓解了其他解决方案中的时间漂移问题。输出结果类似于:
a periodic async thread 1643726990.505269 Hi!
a periodic async thread 1643726991.5069854 Bye!
a periodic async thread 1643726992.506919 Hi!
a periodic async thread 1643726993.5089169 Bye!
a periodic async thread 1643726994.5076022 Hi!
a periodic async thread 1643726995.509422 Bye!
a periodic async thread 1643726996.5075526 Hi!
a periodic async thread 1643726997.5093904 Bye!
a periodic async thread 1643726998.5072556 Hi!
a periodic async thread 1643726999.5091035 Bye!
对于多种类型的调度,我建议使用APSScheduler,该工具支持asyncio。
我将它用于一个简单的Python进程,可以使用docker启动并像cron一样每周执行某些操作,直到我停止docker/process。
after(ms, callback)
方法,这就是我想出来的方法。对我来说,While True:
看起来很丑陋,即使它是异步的(比全局变量更丑)。call_later(s, callback, *args)
方法使用秒而不是毫秒。import asyncio
my_var = 0
def update_forever(the_loop):
global my_var
print(my_var)
my_var += 1
# exit logic could be placed here
the_loop.call_later(3, update_forever, the_loop) # the method adds a delayed callback on completion
event_loop = asyncio.get_event_loop()
event_loop.call_soon(update_forever, event_loop)
event_loop.run_forever()
await asyncio.sleep(time)
。 - songololo