我正在使用Python3编写一个应用程序,并尝试首次使用asyncio。我遇到的一个问题是,我的一些协程阻塞了事件循环的时间超过了我的期望。我想找到类似于事件循环的顶部的东西,可以显示每个协程运行所花费的墙壁/ CPU时间。如果不存在这样的工具,有人知道如何为事件循环添加挂钩以便我可以进行测量吗?
我已经尝试使用cProfile,它可以提供一些有用的输出,但我更关注阻塞事件循环的时间,而不是总执行时间。
我正在使用Python3编写一个应用程序,并尝试首次使用asyncio。我遇到的一个问题是,我的一些协程阻塞了事件循环的时间超过了我的期望。我想找到类似于事件循环的顶部的东西,可以显示每个协程运行所花费的墙壁/ CPU时间。如果不存在这样的工具,有人知道如何为事件循环添加挂钩以便我可以进行测量吗?
我已经尝试使用cProfile,它可以提供一些有用的输出,但我更关注阻塞事件循环的时间,而不是总执行时间。
事件循环已经可以追踪协程执行时是否需要大量CPU时间。要查看它,您应该使用set_debug
方法启用调试模式:
import asyncio
import time
async def main():
time.sleep(1) # Block event loop
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.set_debug(True) # Enable debug
loop.run_until_complete(main())
在输出中,您会看到:
Executing <Task finished coro=<main() [...]> took 1.016 seconds
默认情况下,它会显示协程阻塞超过0.1秒的警告。这并没有被记录在文档中,但根据 asyncio 源代码,看起来你可以更改 slow_callback_duration
属性来修改此值。
call_later
。定期运行回调函数,该函数将记录/通知循环时间与周期间隔时间之间的差异。class EventLoopDelayMonitor:
def __init__(self, loop=None, start=True, interval=1, logger=None):
self._interval = interval
self._log = logger or logging.getLogger(__name__)
self._loop = loop or asyncio.get_event_loop()
if start:
self.start()
def run(self):
self._loop.call_later(self._interval, self._handler, self._loop.time())
def _handler(self, start_time):
latency = (self._loop.time() - start_time) - self._interval
self._log.error('EventLoop delay %.4f', latency)
if not self.is_stopped():
self.run()
def is_stopped(self):
return self._stopped
def start(self):
self._stopped = False
self.run()
def stop(self):
self._stopped = True
例子
import time
async def main():
EventLoopDelayMonitor(interval=1)
await asyncio.sleep(1)
time.sleep(2)
await asyncio.sleep(1)
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出
EventLoop delay 0.0013
EventLoop delay 1.0026
EventLoop delay 0.0014
EventLoop delay 0.0015
def monitor_loop(loop, delay_handler):
loop = loop
last_call = loop.time()
INTERVAL = .5 # How often to poll the loop and check the current delay.
def run_last_call_updater():
loop.call_later(INTERVAL, last_call_updater)
def last_call_updater():
nonlocal last_call
last_call = loop.time()
run_last_call_updater()
run_last_call_updater()
def last_call_checker():
threading.Timer(INTERVAL / 2, last_call_checker).start()
if loop.time() - last_call > INTERVAL:
delay_handler(loop.time() - last_call)
threading.Thread(target=last_call_checker).start()